This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8b568241f0c Add field `taskLimits` to worker select strategies (#16889)
8b568241f0c is described below
commit 8b568241f0c8ee30d4846d28cd37235f5a34d7d5
Author: Misha <[email protected]>
AuthorDate: Thu Mar 6 16:57:35 2025 +0100
Add field `taskLimits` to worker select strategies (#16889)
Changes
---------
- Add field `taskLimits` to the following worker select strategies
`equalDistribution`, `equalDistributionWithCategorySpec`,
`fillCapacityWithCategorySpec`, `fillCapacity`
- Add sub-fields `maxSlotCountByType` and `maxSlotRatioByType` to
`taskLimits`
- Apply these limits per worker when assigning new tasks
---------
Co-authored-by: sviatahorau <[email protected]>
Co-authored-by: Benedict Jin <[email protected]>
Co-authored-by: Kashif Faraz <[email protected]>
---
docs/configuration/index.md | 14 ++
.../overlord/setup/WorkerBehaviorConfigTest.java | 3 +-
.../indexing/overlord/ImmutableWorkerInfo.java | 28 ++-
...PendingTaskBasedWorkerProvisioningStrategy.java | 6 +
...stributionWithAffinityWorkerSelectStrategy.java | 5 +-
...butionWithCategorySpecWorkerSelectStrategy.java | 21 ++-
.../EqualDistributionWorkerSelectStrategy.java | 22 ++-
...llCapacityWithAffinityWorkerSelectStrategy.java | 5 +-
...pacityWithCategorySpecWorkerSelectStrategy.java | 22 ++-
.../setup/FillCapacityWorkerSelectStrategy.java | 22 ++-
.../druid/indexing/overlord/setup/TaskLimits.java | 195 +++++++++++++++++++++
.../overlord/setup/WorkerBehaviorConfig.java | 2 +-
.../indexing/overlord/setup/WorkerSelectUtils.java | 37 +++-
.../indexing/overlord/ImmutableWorkerInfoTest.java | 5 +
.../indexing/overlord/RemoteTaskRunnerTest.java | 2 +-
.../PendingTaskBasedProvisioningStrategyTest.java | 2 +-
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 2 +-
...butionWithAffinityWorkerSelectStrategyTest.java | 149 +++++++++++++++-
...onWithCategorySpecWorkerSelectStrategyTest.java | 4 +-
.../EqualDistributionWorkerSelectStrategyTest.java | 14 +-
...pacityWithAffinityWorkerSelectStrategyTest.java | 145 ++++++++++++++-
...tyWithCategorySpecWorkerSelectStrategyTest.java | 4 +-
22 files changed, 660 insertions(+), 49 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0ed82017c3d..d75309b7ac3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1277,6 +1277,7 @@ This evenly distributes work across your Middle Managers.
|--------|-----------|-------|
|`type`|`equalDistribution`|required; must be `equalDistribution`|
|`affinityConfig`|[`AffinityConfig`](#affinityconfig) object|null (no
affinity)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
###### `equalDistributionWithCategorySpec`
@@ -1288,6 +1289,7 @@ This strategy doesn't work with `AutoScaler` since the
behavior is undefined.
|--------|-----------|-------|
|`type`|`equalDistributionWithCategorySpec`|required; must be
`equalDistributionWithCategorySpec`|
|`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null
(no worker category spec)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
The following example shows tasks of type `index_kafka` that default to
running on Middle Managers of category `c1`, except for tasks that write to
datasource `ds1`, which run on Middle Managers of category `c2`.
@@ -1323,6 +1325,7 @@ Middle Managers up to capacity simultaneously, rather
than a single Middle Manag
|--------|-----------|-------|
|`type`| `fillCapacity`|required; must be `fillCapacity`|
|`affinityConfig`| [`AffinityConfig`](#affinityconfig) object |null (no
affinity)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
###### `fillCapacityWithCategorySpec`
@@ -1334,6 +1337,7 @@ This strategy doesn't work with `AutoScaler` since the
behavior is undefined.
|--------|-----------|-------|
|`type`|`fillCapacityWithCategorySpec`.|required; must be
`fillCapacityWithCategorySpec`|
|`workerCategorySpec`|[`WorkerCategorySpec`](#workercategoryspec) object|null
(no worker category spec)|
+|`taskLimits`|[`TaskLimits`](#tasklimits) object|null (no limits)|
<a name="javascript-worker-select-strategy"></a>
@@ -1383,6 +1387,16 @@ field. If not provided, the default is to not use it at
all.
|`categoryMap`|A JSON map object mapping a task type String name to a
[CategoryConfig](#categoryconfig) object, by which you can specify category
config for different task type.|`{}`|
|`strong`|With weak workerCategorySpec (the default), tasks for a dataSource
may be assigned to other Middle Managers if the Middle Managers specified in
`categoryMap` are not able to run all pending tasks in the queue for that
dataSource. With strong workerCategorySpec, tasks for a dataSource will only
ever be assigned to their specified Middle Managers, and will wait in the
pending queue if necessary.|false|
+###### `taskLimits`
+
+The `taskLimits` field can be used with the `equalDistribution`,
`fillCapacity`, `equalDistributionWithCategorySpec` and
`fillCapacityWithCategorySpec` strategies.
+If you don't provide it, it will default to not being used.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`maxSlotCountByType`|A map where each key is a task type (`String`), and the
corresponding value represents the absolute limit on the number of task slots
that tasks of this type can occupy. The value is an `Integer` that is greater
than or equal to 0. For example, a value of 5 means that tasks of this type can
occupy up to 5 task slots in total. If both absolute and ratio limits are
specified for the same task type, the effective limit will be the smaller of
the absolute limit and the [...]
+|`maxSlotRatioByType`|A map where each key is a task type (`String`), and the
corresponding value is a `Double` which should be in the range [0, 1],
representing the ratio of task slots that tasks of this type can occupy. This
ratio defines the proportion of total task slots a task type can use,
calculated as `ratio * totalSlots`. If both absolute and ratio limits are
specified for the same task type, the effective limit will be the smaller of
the absolute limit and the limit derived fro [...]
+
###### CategoryConfig
|Property|Description|Default|
diff --git
a/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
b/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
index a47a79fa9bc..f777bad42e8 100644
---
a/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
+++
b/extensions-core/ec2-extensions/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfigTest.java
@@ -45,7 +45,8 @@ public class WorkerBehaviorConfigTest
new AffinityConfig(
ImmutableMap.of("foo", ImmutableSet.of("localhost")),
false
- )
+ ),
+ null
),
new EC2AutoScaler(
7,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index b1eafe0dc59..d0a938e7022 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -33,6 +34,8 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -45,6 +48,7 @@ public class ImmutableWorkerInfo
private final Worker worker;
private final int currCapacityUsed;
private final int currParallelIndexCapacityUsed;
+ private final Map<String, Integer> currCapacityUsedByTaskType;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
@@ -57,6 +61,7 @@ public class ImmutableWorkerInfo
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int
currParallelIndexCapacityUsed,
+ @JsonProperty("currCapacityUsedByTaskType") Map<String, Integer>
currCapacityUsedByTaskType,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime,
@@ -66,6 +71,7 @@ public class ImmutableWorkerInfo
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.currParallelIndexCapacityUsed = currParallelIndexCapacityUsed;
+ this.currCapacityUsedByTaskType =
Configs.valueOrDefault(currCapacityUsedByTaskType, Collections.emptyMap());
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
@@ -76,12 +82,13 @@ public class ImmutableWorkerInfo
Worker worker,
int currCapacityUsed,
int currParallelIndexCapacityUsed,
+ Map<String, Integer> currCapacityUsedByTaskType,
Set<String> availabilityGroups,
Collection<String> runningTasks,
DateTime lastCompletedTaskTime
)
{
- this(worker, currCapacityUsed, currParallelIndexCapacityUsed,
availabilityGroups,
+ this(worker, currCapacityUsed, currParallelIndexCapacityUsed,
currCapacityUsedByTaskType, availabilityGroups,
runningTasks, lastCompletedTaskTime, null
);
}
@@ -94,7 +101,7 @@ public class ImmutableWorkerInfo
DateTime lastCompletedTaskTime
)
{
- this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks,
lastCompletedTaskTime, null);
+ this(worker, currCapacityUsed, 0, Collections.emptyMap(),
availabilityGroups, runningTasks, lastCompletedTaskTime, null);
}
/**
@@ -111,6 +118,8 @@ public class ImmutableWorkerInfo
int currParallelIndexCapacityUsed = 0;
ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
+ Map<String, Integer> currCapacityUsedByTaskType = new HashMap<>();
+
for (final Map.Entry<String, TaskAnnouncement> entry :
announcements.entrySet()) {
final TaskAnnouncement announcement = entry.getValue();
@@ -126,6 +135,8 @@ public class ImmutableWorkerInfo
currParallelIndexCapacityUsed += requiredCapacity;
}
+ currCapacityUsedByTaskType.merge(announcement.getTaskType(), 1,
Integer::sum);
+
taskIds.add(taskId);
availabilityGroups.add(taskResource.getAvailabilityGroup());
}
@@ -135,6 +146,7 @@ public class ImmutableWorkerInfo
worker,
currCapacityUsed,
currParallelIndexCapacityUsed,
+ currCapacityUsedByTaskType,
availabilityGroups.build(),
taskIds.build(),
lastCompletedTaskTime,
@@ -160,6 +172,13 @@ public class ImmutableWorkerInfo
return currParallelIndexCapacityUsed;
}
+ @JsonProperty("currCapacityUsedByTaskType")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public Map<String, Integer> getCurrCapacityUsedByTaskType()
+ {
+ return currCapacityUsedByTaskType;
+ }
+
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
@@ -243,6 +262,9 @@ public class ImmutableWorkerInfo
if (currParallelIndexCapacityUsed != that.currParallelIndexCapacityUsed) {
return false;
}
+ if (!currCapacityUsedByTaskType.equals(that.currCapacityUsedByTaskType)) {
+ return false;
+ }
if (!worker.equals(that.worker)) {
return false;
}
@@ -266,6 +288,7 @@ public class ImmutableWorkerInfo
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
result = 31 * result + currParallelIndexCapacityUsed;
+ result = 31 * result + currCapacityUsedByTaskType.hashCode();
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
@@ -280,6 +303,7 @@ public class ImmutableWorkerInfo
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
", currParallelIndexCapacityUsed=" + currParallelIndexCapacityUsed +
+ ", currCapacityUsedByTaskType=" + currCapacityUsedByTaskType +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 0d715b53549..10122678893 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -482,10 +482,16 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
int parallelIndexTaskCapacity =
task.getType().equals(ParallelIndexSupervisorTask.TYPE)
?
task.getTaskResource().getRequiredCapacity()
: 0;
+ int taskCapacity = task.getTaskResource().getRequiredCapacity();
+
+ final Map<String, Integer> typeSpecificCapacity = new
HashMap<>(immutableWorker.getCurrCapacityUsedByTaskType());
+ typeSpecificCapacity.merge(task.getType(), taskCapacity, Integer::sum);
+
return new ImmutableWorkerInfo(
immutableWorker.getWorker(),
immutableWorker.getCurrCapacityUsed() + 1,
immutableWorker.getCurrParallelIndexCapacityUsed() +
parallelIndexTaskCapacity,
+ typeSpecificCapacity,
Sets.union(
immutableWorker.getAvailabilityGroups(),
Sets.newHashSet(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
index 6d28ae78fd0..e60aeae018a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategy.java
@@ -29,9 +29,10 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDist
{
@JsonCreator
public EqualDistributionWithAffinityWorkerSelectStrategy(
- @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+ @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+ @JsonProperty("taskLimits") TaskLimits taskLimits
)
{
- super(affinityConfig);
+ super(affinityConfig, taskLimits);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
index ec65693ac3d..b3715aec285 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategy.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -32,13 +33,16 @@ import java.util.Objects;
public class EqualDistributionWithCategorySpecWorkerSelectStrategy implements
WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;
+ private final TaskLimits taskLimits;
@JsonCreator
public EqualDistributionWithCategorySpecWorkerSelectStrategy(
- @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
+ @JsonProperty("workerCategorySpec") WorkerCategorySpec
workerCategorySpec,
+ @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.workerCategorySpec = workerCategorySpec;
+ this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}
@JsonProperty
@@ -47,6 +51,12 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
return workerCategorySpec;
}
+ @JsonProperty
+ public TaskLimits getTaskLimits()
+ {
+ return taskLimits;
+ }
+
@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
@@ -60,7 +70,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
zkWorkers,
config,
workerCategorySpec,
- EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
+ EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
+ taskLimits
);
}
@@ -74,13 +85,14 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
return false;
}
final EqualDistributionWithCategorySpecWorkerSelectStrategy that =
(EqualDistributionWithCategorySpecWorkerSelectStrategy) o;
- return Objects.equals(workerCategorySpec, that.workerCategorySpec);
+ return Objects.equals(workerCategorySpec, that.workerCategorySpec)
+ && Objects.equals(taskLimits, that.taskLimits);
}
@Override
public int hashCode()
{
- return Objects.hash(workerCategorySpec);
+ return Objects.hash(workerCategorySpec, taskLimits);
}
@Override
@@ -88,6 +100,7 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategy implements Wo
{
return "EqualDistributionWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
+ ", taskLimits=" + taskLimits +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
index afb020e9f7a..f3775d91e52 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
@@ -35,13 +37,16 @@ import java.util.Objects;
public class EqualDistributionWorkerSelectStrategy implements
WorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
+ private final TaskLimits taskLimits;
@JsonCreator
public EqualDistributionWorkerSelectStrategy(
- @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+ @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+ @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.affinityConfig = affinityConfig;
+ this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}
@JsonProperty
@@ -50,6 +55,12 @@ public class EqualDistributionWorkerSelectStrategy
implements WorkerSelectStrate
return affinityConfig;
}
+ @JsonProperty
+ public TaskLimits getTaskLimits()
+ {
+ return taskLimits;
+ }
+
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
@@ -62,7 +73,8 @@ public class EqualDistributionWorkerSelectStrategy implements
WorkerSelectStrate
zkWorkers,
config,
affinityConfig,
- EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers
+ EqualDistributionWorkerSelectStrategy::selectFromEligibleWorkers,
+ taskLimits
);
}
@@ -83,13 +95,14 @@ public class EqualDistributionWorkerSelectStrategy
implements WorkerSelectStrate
return false;
}
final EqualDistributionWorkerSelectStrategy that =
(EqualDistributionWorkerSelectStrategy) o;
- return Objects.equals(affinityConfig, that.affinityConfig);
+ return Objects.equals(affinityConfig, that.affinityConfig)
+ && Objects.equals(taskLimits, that.taskLimits);
}
@Override
public int hashCode()
{
- return Objects.hash(affinityConfig);
+ return Objects.hash(affinityConfig, taskLimits);
}
@Override
@@ -97,6 +110,7 @@ public class EqualDistributionWorkerSelectStrategy
implements WorkerSelectStrate
{
return "EqualDistributionWorkerSelectStrategy{" +
"affinityConfig=" + affinityConfig +
+ ", taskLimits=" + taskLimits +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
index 46c303f9048..efa24b75b5f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategy.java
@@ -29,9 +29,10 @@ public class FillCapacityWithAffinityWorkerSelectStrategy
extends FillCapacityWo
{
@JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy(
- @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+ @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+ @JsonProperty("taskLimits") TaskLimits taskLimits
)
{
- super(affinityConfig);
+ super(affinityConfig, taskLimits);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
index 3dcdfe9a540..d50b19824cf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategy.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
@@ -32,13 +33,17 @@ import java.util.Objects;
public class FillCapacityWithCategorySpecWorkerSelectStrategy implements
WorkerSelectStrategy
{
private final WorkerCategorySpec workerCategorySpec;
+ private final TaskLimits taskLimits;
+
@JsonCreator
public FillCapacityWithCategorySpecWorkerSelectStrategy(
- @JsonProperty("workerCategorySpec") WorkerCategorySpec workerCategorySpec
+ @JsonProperty("workerCategorySpec") WorkerCategorySpec
workerCategorySpec,
+ @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.workerCategorySpec = workerCategorySpec;
+ this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}
@JsonProperty
@@ -47,6 +52,12 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
return workerCategorySpec;
}
+ @JsonProperty
+ public TaskLimits getTaskLimits()
+ {
+ return taskLimits;
+ }
+
@Nullable
@Override
public ImmutableWorkerInfo findWorkerForTask(
@@ -60,7 +71,8 @@ public class FillCapacityWithCategorySpecWorkerSelectStrategy
implements WorkerS
zkWorkers,
config,
workerCategorySpec,
- FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
+ FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers,
+ taskLimits
);
}
@@ -74,13 +86,14 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
return false;
}
final FillCapacityWithCategorySpecWorkerSelectStrategy that =
(FillCapacityWithCategorySpecWorkerSelectStrategy) o;
- return Objects.equals(workerCategorySpec, that.workerCategorySpec);
+ return Objects.equals(workerCategorySpec, that.workerCategorySpec)
+ && Objects.equals(taskLimits, that.taskLimits);
}
@Override
public int hashCode()
{
- return Objects.hash(workerCategorySpec);
+ return Objects.hash(workerCategorySpec, taskLimits);
}
@Override
@@ -88,6 +101,7 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategy implements WorkerS
{
return "FillCapacityWithCategorySpecWorkerSelectStrategy{" +
"workerCategorySpec=" + workerCategorySpec +
+ ", taskLimits=" + taskLimits +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
index 47ac65b8271..bd8b53bacec 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
@@ -33,13 +35,16 @@ import java.util.Objects;
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
+ private final TaskLimits taskLimits;
@JsonCreator
public FillCapacityWorkerSelectStrategy(
- @JsonProperty("affinityConfig") AffinityConfig affinityConfig
+ @JsonProperty("affinityConfig") AffinityConfig affinityConfig,
+ @JsonProperty("taskLimits") @Nullable TaskLimits taskLimits
)
{
this.affinityConfig = affinityConfig;
+ this.taskLimits = Configs.valueOrDefault(taskLimits, TaskLimits.EMPTY);
}
@JsonProperty
@@ -48,6 +53,12 @@ public class FillCapacityWorkerSelectStrategy implements
WorkerSelectStrategy
return affinityConfig;
}
+ @JsonProperty
+ public TaskLimits getTaskLimits()
+ {
+ return taskLimits;
+ }
+
@Override
public ImmutableWorkerInfo findWorkerForTask(
final WorkerTaskRunnerConfig config,
@@ -60,7 +71,8 @@ public class FillCapacityWorkerSelectStrategy implements
WorkerSelectStrategy
zkWorkers,
config,
affinityConfig,
- FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers
+ FillCapacityWorkerSelectStrategy::selectFromEligibleWorkers,
+ taskLimits
);
}
@@ -81,13 +93,14 @@ public class FillCapacityWorkerSelectStrategy implements
WorkerSelectStrategy
return false;
}
final FillCapacityWorkerSelectStrategy that =
(FillCapacityWorkerSelectStrategy) o;
- return Objects.equals(affinityConfig, that.affinityConfig);
+ return Objects.equals(affinityConfig, that.affinityConfig)
+ && Objects.equals(taskLimits, that.taskLimits);
}
@Override
public int hashCode()
{
- return Objects.hash(affinityConfig);
+ return Objects.hash(affinityConfig, taskLimits);
}
@Override
@@ -95,6 +108,7 @@ public class FillCapacityWorkerSelectStrategy implements
WorkerSelectStrategy
{
return "FillCapacityWorkerSelectStrategy{" +
"affinityConfig=" + affinityConfig +
+ ", taskLimits=" + taskLimits +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
new file mode 100644
index 00000000000..39e6333c62f
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/TaskLimits.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.setup;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Defines global limits for task execution using absolute slot counts and
proportional ratios.
+ *
+ * <p>Task count limits ({@code maxSlotCountByType}) define the maximum number
of slots per task type.
+ * Task ratios ({@code maxSlotRatioByType}) define the proportion of total
slots a task type can use.
+ * If both are set for a task type, the lower limit applies.</p>
+ *
+ * <p>Example:
+ * {@code maxSlotCountByType = {"index_parallel": 3, "query_controller": 5}}
+ * {@code maxSlotRatioByType = {"index_parallel": 0.5, "query_controller":
0.25}}</p>
+ */
+public class TaskLimits
+{
+ public static final TaskLimits EMPTY = new TaskLimits();
+ private final Map<String, Integer> maxSlotCountByType;
+ private final Map<String, Double> maxSlotRatioByType;
+
+ private TaskLimits()
+ {
+ this(Map.of(), Map.of());
+ }
+
+ @JsonCreator
+ public TaskLimits(
+ @JsonProperty("maxSlotCountByType") @Nullable Map<String, Integer>
maxSlotCountByType,
+ @JsonProperty("maxSlotRatioByType") @Nullable Map<String, Double>
maxSlotRatioByType
+ )
+ {
+ validateLimits(maxSlotCountByType, maxSlotRatioByType);
+ this.maxSlotCountByType = Configs.valueOrDefault(maxSlotCountByType,
Collections.emptyMap());
+ this.maxSlotRatioByType = Configs.valueOrDefault(maxSlotRatioByType,
Collections.emptyMap());
+ }
+
+ /**
+ * Determines whether the given task can be executed based on task limits
and available capacity.
+ *
+ * @param task The task to check.
+ * @param currentSlotsUsed The current capacity used by tasks of the same
type.
+ * @param totalCapacity The total available capacity across all workers.
+ * @return {@code true} if the task meets the defined limits and capacity
constraints; {@code false} otherwise.
+ */
+ public boolean canRunTask(Task task, Integer currentSlotsUsed, Integer
totalCapacity)
+ {
+ if (maxSlotRatioByType.isEmpty() && maxSlotCountByType.isEmpty()) {
+ return true;
+ }
+ return meetsTaskLimit(
+ task,
+ currentSlotsUsed,
+ totalCapacity
+ );
+ }
+
+ private boolean meetsTaskLimit(
+ Task task,
+ Integer currentSlotsUsed,
+ Integer totalCapacity
+ )
+ {
+ final Integer limit = getLimitForTask(task.getType(), totalCapacity);
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+
+ return hasCapacityBasedOnLimit(limit, currentSlotsUsed, requiredCapacity);
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Integer totalCapacity
+ )
+ {
+ Integer absoluteLimit = maxSlotCountByType.get(taskType);
+ Double ratioLimit = maxSlotRatioByType.get(taskType);
+
+ if (absoluteLimit == null && ratioLimit == null) {
+ return null;
+ }
+
+ if (ratioLimit != null) {
+ int ratioBasedLimit = calculateTaskCapacityFromRatio(ratioLimit,
totalCapacity);
+ if (absoluteLimit != null) {
+ return Math.min(absoluteLimit, ratioBasedLimit);
+ } else {
+ return ratioBasedLimit;
+ }
+ } else {
+ return absoluteLimit;
+ }
+ }
+
+ private boolean hasCapacityBasedOnLimit(int limit, int currentCapacityUsed,
int requiredCapacity)
+ {
+ return limit - currentCapacityUsed >= requiredCapacity;
+ }
+
+ private int calculateTaskCapacityFromRatio(double taskSlotRatio, int
totalCapacity)
+ {
+ int workerParallelIndexCapacity = (int) Math.floor(taskSlotRatio *
totalCapacity);
+ return Math.max(1, Math.min(workerParallelIndexCapacity, totalCapacity));
+ }
+
+ private void validateLimits(Map<String, Integer> taskCountLimits,
Map<String, Double> taskRatios)
+ {
+ if (taskCountLimits != null &&
taskCountLimits.values().stream().anyMatch(val -> val < 0)) {
+ throw InvalidInput.exception(
+ "Max task slot count limit for any type must be greater than zero.
Found[%s].",
+ taskCountLimits
+ );
+ } else if (taskRatios != null && taskRatios.values().stream().anyMatch(val
-> val < 0 || val > 1)) {
+ throw InvalidInput.exception(
+ "Max task slot ratios should be in the interval of [0, 1].
Found[%s].",
+ taskCountLimits
+ );
+ }
+ }
+
+ @JsonProperty
+ public Map<String, Integer> getMaxSlotCountByType()
+ {
+ return maxSlotCountByType;
+ }
+
+ @JsonProperty
+ public Map<String, Double> getMaxSlotRatioByType()
+ {
+ return maxSlotRatioByType;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskLimits that = (TaskLimits) o;
+ return Objects.equals(maxSlotCountByType, that.maxSlotCountByType) &&
Objects.equals(
+ maxSlotRatioByType,
+ that.maxSlotRatioByType
+ );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(maxSlotCountByType, maxSlotRatioByType);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskLimits{" +
+ "maxSlotCountByType=" + maxSlotCountByType +
+ ", maxSlotRatioByType=" + maxSlotRatioByType +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
index ffc30f55e64..4b867a4bb10 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerBehaviorConfig.java
@@ -40,7 +40,7 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
public interface WorkerBehaviorConfig
{
String CONFIG_KEY = "worker.config";
- WorkerSelectStrategy DEFAULT_STRATEGY = new
EqualDistributionWorkerSelectStrategy(null);
+ WorkerSelectStrategy DEFAULT_STRATEGY = new
EqualDistributionWorkerSelectStrategy(null, null);
WorkerSelectStrategy getSelectStrategy();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index c3832daae32..be4e8426008 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -54,10 +55,11 @@ public class WorkerSelectUtils
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
@Nullable final AffinityConfig affinityConfig,
- final Function<ImmutableMap<String, ImmutableWorkerInfo>,
ImmutableWorkerInfo> workerSelector
+ final Function<ImmutableMap<String, ImmutableWorkerInfo>,
ImmutableWorkerInfo> workerSelector,
+ final TaskLimits taskLimits
)
{
- final Map<String, ImmutableWorkerInfo> runnableWorkers =
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
+ final Map<String, ImmutableWorkerInfo> runnableWorkers =
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits);
if (affinityConfig == null) {
// All runnable workers are valid.
@@ -105,10 +107,11 @@ public class WorkerSelectUtils
final Map<String, ImmutableWorkerInfo> allWorkers,
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
@Nullable final WorkerCategorySpec workerCategorySpec,
- final Function<ImmutableMap<String, ImmutableWorkerInfo>,
ImmutableWorkerInfo> workerSelector
+ final Function<ImmutableMap<String, ImmutableWorkerInfo>,
ImmutableWorkerInfo> workerSelector,
+ final TaskLimits taskLimits
)
{
- final Map<String, ImmutableWorkerInfo> runnableWorkers =
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig);
+ final Map<String, ImmutableWorkerInfo> runnableWorkers =
getRunnableWorkers(task, allWorkers, workerTaskRunnerConfig, taskLimits);
// select worker according to worker category spec
if (workerCategorySpec != null) {
@@ -145,9 +148,17 @@ public class WorkerSelectUtils
private static Map<String, ImmutableWorkerInfo> getRunnableWorkers(
final Task task,
final Map<String, ImmutableWorkerInfo> allWorkers,
- final WorkerTaskRunnerConfig workerTaskRunnerConfig
+ final WorkerTaskRunnerConfig workerTaskRunnerConfig,
+ final TaskLimits taskLimits
)
{
+ if (!taskLimits.canRunTask(
+ task,
+ getTotalCapacityUsedByType(allWorkers, task.getType()),
+ getTotalCapacity(allWorkers)
+ )) {
+ return Collections.emptyMap();
+ }
return allWorkers.values()
.stream()
.filter(worker -> worker.canRunTask(task,
workerTaskRunnerConfig.getParallelIndexTaskSlotRatio())
@@ -193,4 +204,20 @@ public class WorkerSelectUtils
)
);
}
+
+ private static int getTotalCapacity(final Map<String, ImmutableWorkerInfo>
allWorkers)
+ {
+ return allWorkers.values().stream().mapToInt(workerInfo ->
workerInfo.getWorker().getCapacity()).sum();
+ }
+
+ private static int getTotalCapacityUsedByType(
+ final Map<String, ImmutableWorkerInfo> allWorkers,
+ final String taskType
+ )
+ {
+ return allWorkers.values()
+ .stream()
+ .mapToInt(workerInfo ->
workerInfo.getCurrCapacityUsedByTaskType().getOrDefault(taskType, 0))
+ .sum();
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
index e373eb982a3..122e9892dc8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -201,6 +203,7 @@ public class ImmutableWorkerInfoTest
),
3,
0,
+ Collections.emptyMap(),
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z"),
@@ -211,6 +214,7 @@ public class ImmutableWorkerInfoTest
),
2,
0,
+ Collections.emptyMap(),
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z"),
@@ -225,6 +229,7 @@ public class ImmutableWorkerInfoTest
new Worker("http", "testWorker2", "192.0.0.1", 10, "v1",
WorkerConfig.DEFAULT_CATEGORY),
6,
0,
+ Collections.emptyMap(),
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z")
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 8207584f213..7bbd6ef5e37 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -632,7 +632,7 @@ public class RemoteTaskRunnerTest
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
- new DefaultWorkerBehaviorConfig(new
EqualDistributionWorkerSelectStrategy(null), null)
+ new DefaultWorkerBehaviorConfig(new
EqualDistributionWorkerSelectStrategy(null, null), null)
);
Assert.assertEquals(-1,
remoteTaskRunner.getMaximumCapacityWithAutoscale());
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 8a1232b9571..eaba6f9e6f9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -92,7 +92,7 @@ public class PendingTaskBasedProvisioningStrategyTest
workerConfig = new AtomicReference<>(
new DefaultWorkerBehaviorConfig(
- new FillCapacityWorkerSelectStrategy(null),
+ new FillCapacityWorkerSelectStrategy(null, null),
autoScaler
)
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 91b0778c950..3e5c862d7a6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1818,7 +1818,7 @@ public class HttpRemoteTaskRunnerTest
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(HttpClient.class),
- DSuppliers.of(new AtomicReference<>(new
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null),
null))),
+ DSuppliers.of(new AtomicReference<>(new
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null,
null), null))),
new TestProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createMock(TaskStorage.class),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
index 0641d3a8d91..1e81be9cc97 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithAffinityWorkerSelectStrategyTest.java
@@ -32,7 +32,10 @@ import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
public class EqualDistributionWithAffinityWorkerSelectStrategyTest
{
@@ -40,7 +43,8 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTask()
{
EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost1", "localhost2", "localhost3")), false),
+ null
);
NoopTask noopTask = NoopTask.forDatasource("foo");
@@ -85,7 +89,8 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls()
{
EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -115,7 +120,8 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategyTest
public void testIsolation()
{
EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -139,7 +145,8 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategyTest
{
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
final WorkerSelectStrategy strategy2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(strategy),
@@ -147,4 +154,138 @@ public class
EqualDistributionWithAffinityWorkerSelectStrategyTest
);
Assert.assertEquals(strategy, strategy2);
}
+
+ @Test
+ public void testFindWorkerForTaskWithGlobalLimits()
+ {
+ Map<String, Integer> taskLimits = new HashMap<>();
+ taskLimits.put("noop", 2);
+
+ Map<String, Integer> capacityUsed = new HashMap<>();
+ capacityUsed.put("noop", 1);
+ EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
+ null,
+ new TaskLimits(taskLimits, null)
+ );
+
+ NoopTask noopTask = NoopTask.forDatasource("foo");
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+ Assert.assertNotNull(worker);
+
+ ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+
+ Assert.assertNull(worker1);
+
+ }
+
+ @Test
+ public void testFindWorkerForTaskWithGlobalRatios()
+ {
+ Map<String, Double> taskRatios = new HashMap<>();
+ taskRatios.put("noop", 0.5);
+
+ Map<String, Integer> capacityUsed = new HashMap<>();
+ capacityUsed.put("noop", 1);
+ EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWithAffinityWorkerSelectStrategy(
+ null,
+ new TaskLimits(null, taskRatios)
+ );
+
+ NoopTask noopTask = NoopTask.forDatasource("foo");
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+ Assert.assertNotNull(worker);
+
+ ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ Set.of(),
+ Set.of(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+
+ Assert.assertNull(worker1);
+
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
index 99a690a738d..6ac04a8920f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
@@ -176,8 +176,8 @@ public class
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec
workerCategorySpec)
{
- final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy = new
EqualDistributionWithCategorySpecWorkerSelectStrategy(
- workerCategorySpec);
+ final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+ new
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
index dac25a3c9e3..58280641645 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java
@@ -69,7 +69,7 @@ public class EqualDistributionWorkerSelectStrategyTest
@Test
public void testFindWorkerForTask()
{
- final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null);
+ final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
@@ -97,7 +97,7 @@ public class EqualDistributionWorkerSelectStrategyTest
@Test
public void testFindWorkerForTaskWhenSameCurrCapacityUsed()
{
- final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null);
+ final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
@@ -126,7 +126,7 @@ public class EqualDistributionWorkerSelectStrategyTest
public void testOneDisableWorkerDifferentUsedCapacity()
{
String disabledVersion = "";
- final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null);
+ final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
@@ -155,7 +155,7 @@ public class EqualDistributionWorkerSelectStrategyTest
public void testOneDisableWorkerSameUsedCapacity()
{
String disabledVersion = "";
- final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null);
+ final EqualDistributionWorkerSelectStrategy strategy = new
EqualDistributionWorkerSelectStrategy(null, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
@@ -190,7 +190,8 @@ public class EqualDistributionWorkerSelectStrategyTest
"bar", ImmutableSet.of("nonexistent-worker")
),
false
- )
+ ),
+ null
);
ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
@@ -226,7 +227,8 @@ public class EqualDistributionWorkerSelectStrategyTest
"bar", ImmutableSet.of("nonexistent-worker")
),
true
- )
+ ),
+ null
);
ImmutableWorkerInfo workerFoo = strategy.findWorkerForTask(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
index 93f6c39b0f0..207c6e43dda 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithAffinityWorkerSelectStrategyTest.java
@@ -30,7 +30,9 @@ import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
public class FillCapacityWithAffinityWorkerSelectStrategyTest
{
@@ -38,7 +40,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTask()
{
FillCapacityWorkerSelectStrategy strategy = new
FillCapacityWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -68,7 +71,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTaskWithNulls()
{
FillCapacityWorkerSelectStrategy strategy = new
FillCapacityWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -98,7 +102,8 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testIsolation()
{
FillCapacityWorkerSelectStrategy strategy = new
FillCapacityWithAffinityWorkerSelectStrategy(
- new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false)
+ new AffinityConfig(ImmutableMap.of("foo",
ImmutableSet.of("localhost")), false),
+ null
);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
@@ -116,4 +121,138 @@ public class
FillCapacityWithAffinityWorkerSelectStrategyTest
);
Assert.assertNull(worker);
}
+
+ @Test
+ public void testFindWorkerForTaskWithGlobalLimits()
+ {
+ Map<String, Integer> taskLimits = new HashMap<>();
+ taskLimits.put("noop", 2);
+
+ Map<String, Integer> capacityUsed = new HashMap<>();
+ capacityUsed.put("noop", 1);
+ FillCapacityWorkerSelectStrategy strategy = new
FillCapacityWorkerSelectStrategy(
+ null,
+ new TaskLimits(taskLimits, null)
+ );
+
+ NoopTask noopTask = NoopTask.forDatasource("foo");
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+ Assert.assertNotNull(worker);
+
+ ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+
+ Assert.assertNull(worker1);
+
+ }
+
+ @Test
+ public void testFindWorkerForTaskWithGlobalRatios()
+ {
+ Map<String, Double> taskRatios = new HashMap<>();
+ taskRatios.put("noop", 0.5);
+
+ Map<String, Integer> capacityUsed = new HashMap<>();
+ capacityUsed.put("noop", 1);
+ FillCapacityWorkerSelectStrategy strategy = new
FillCapacityWorkerSelectStrategy(
+ null,
+ new TaskLimits(null, taskRatios)
+ );
+
+ NoopTask noopTask = NoopTask.forDatasource("foo");
+ ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+ Assert.assertNotNull(worker);
+
+ ImmutableWorkerInfo worker1 = strategy.findWorkerForTask(
+ new RemoteTaskRunnerConfig(),
+ ImmutableMap.of(
+ "localhost0",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ ),
+ "localhost1",
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost1", "localhost1", 2, "v1",
WorkerConfig.DEFAULT_CATEGORY), 0,
+ 0,
+ capacityUsed,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ ),
+ noopTask
+ );
+
+ Assert.assertNull(worker1);
+
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
index bb4795d029e..880ef743dca 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
@@ -176,8 +176,8 @@ public class
FillCapacityWithCategorySpecWorkerSelectStrategyTest
private ImmutableWorkerInfo selectWorker(WorkerCategorySpec
workerCategorySpec)
{
- final FillCapacityWithCategorySpecWorkerSelectStrategy strategy = new
FillCapacityWithCategorySpecWorkerSelectStrategy(
- workerCategorySpec);
+ final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+ new
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]