This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 67abdc2012f Add optional plugins to basic cost function in
CostBasedAutoScaler (#18976)
67abdc2012f is described below
commit 67abdc2012f51b31f288aa7feb5c71baf7cb9a4c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Thu Feb 5 18:04:23 2026 +0200
Add optional plugins to basic cost function in CostBasedAutoScaler (#18976)
Changes:
- separate the logic of pure cost function, making all additional logic
opt-in in config;
- `scaleDownBarrier` has been changed to `minScaleDownDelay`, which is now
`Duration`;
- changes to high lag fast scaleup: logarithmic scaling formula for idle
decay on high lag and task boundaries.
Details:
This change replaces the sqrt-based scaling formula with a logarithmic
formula that provides
more aggressive emergency recovery at low task counts and millions of lag.
Idle decay: ` ln(lagSeverity) / ln(maxSeverity)`. Less aggressive, scales
well with lag growth.
Formula `K = P/(6.4*sqrt(C))` means small task counts get massive K values
(emergency recovery),
while large task counts get smaller K values (stability).
---
docs/ingestion/supervisor.md | 12 +-
.../CostBasedAutoScalerIntegrationTest.java | 13 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 164 ++++++---
.../autoscaler/CostBasedAutoScalerConfig.java | 98 +++--
.../autoscaler/WeightedCostFunction.java | 67 ++--
.../autoscaler/CostBasedAutoScalerConfigTest.java | 48 ++-
.../CostBasedAutoScalerHighLagScalingTest.java | 154 ++++++++
.../autoscaler/CostBasedAutoScalerMockTest.java | 66 ++--
.../autoscaler/CostBasedAutoScalerTest.java | 410 ++++++++++++---------
.../autoscaler/WeightedCostFunctionTest.java | 88 ++++-
10 files changed, 733 insertions(+), 387 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 00256e4ea9c..95991d4d5cf 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -203,13 +203,15 @@ Note: Kinesis is not supported yet, support is in
progress.
The following table outlines the configuration properties related to the
`costBased` autoscaler strategy:
| Property|Description|Required|Default|
-|---------|---------------------------------------------------|---|-----|
-|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale
action is triggered. | No | 60000 |
-|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25|
+|---------|-----------|--------|-------|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale
action is triggered. | No | 600000 |
+|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
|`defaultProcessingRate`|A planned processing rate per task, required for
first cost estimations. | No | 1000 |
-|`scaleDownBarrier`| A number of successful scale down attempts which should
be skipped to prevent the auto-scaler from scaling down tasks immediately. |
No | 5 |
-|`scaleDownDuringTaskRolloverOnly`| Indicates whether task scaling down is
limited to periods during task rollovers only. | No | False |
+|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when
selecting task counts.|No| `false` |
+|`highLagThreshold`|Per-partition lag threshold that triggers burst scale-up
when set to a value greater than `0`. Set to a negative value to disable burst
scale-up.|No|-1|
+|`minScaleDownDelay`|Minimum duration between successful scale actions,
specified as an ISO-8601 duration string.|No|`PT30M`|
+|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is
limited to periods during task rollovers only.|No|`false`|
The following example shows a supervisor spec with `lagBased` autoscaler:
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index ad23e0b6fbe..1138aacf6cf 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -127,13 +128,13 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.taskCountMin(1)
.taskCountMax(100)
.taskCountStart(initialTaskCount)
- .scaleActionPeriodMillis(1500)
- .minTriggerScaleActionFrequencyMillis(3000)
+ .scaleActionPeriodMillis(1900)
+ .minTriggerScaleActionFrequencyMillis(2000)
// Weight configuration: strongly favor lag reduction over idle time
.lagWeight(0.9)
.idleWeight(0.1)
.scaleDownDuringTaskRolloverOnly(false)
- .scaleDownBarrier(1)
+ .minScaleDownDelay(Duration.ZERO)
.build();
final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
@@ -147,10 +148,10 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.hasDimension(DruidMetrics.DATASOURCE,
dataSource));
// Wait for autoscaler to emit optimalTaskCount metric indicating
scale-down
- // We expect the optimal task count to 4
+ // We expect the optimal task count less than 6
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
- .hasValueMatching(Matchers.equalTo(6L))
+ .hasValueMatching(Matchers.lessThanOrEqualTo(6L))
);
// Suspend the supervisor
@@ -229,7 +230,7 @@ public class CostBasedAutoScalerIntegrationTest extends
EmbeddedClusterTestBase
.idleWeight(0.9)
.scaleDownDuringTaskRolloverOnly(true)
// Do not slow scale-downs
- .scaleDownBarrier(0)
+ .minScaleDownDelay(Duration.ZERO)
.build();
final KafkaSupervisorSpec spec =
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig,
initialTaskCount);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 060240014be..8bf2e8ab787 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -27,6 +27,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -55,31 +56,20 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
{
private static final EmittingLogger log = new
EmittingLogger(CostBasedAutoScaler.class);
- private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
- private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
- /**
- * Defines the step size used for evaluating lag when computing scaling
actions.
- * This constant helps control the granularity of lag considerations in
scaling decisions,
- * ensuring smoother transitions between scaled states and avoiding abrupt
changes in task counts.
- */
- private static final int LAG_STEP = 50_000;
- /**
- * This parameter fine-tunes autoscaling behavior by adding extra flexibility
- * when calculating maximum allowable partitions per task in response to lag,
- * which must be processed as fast, as possible.
- * It acts as a foundational factor that balances the responsiveness and
stability of autoscaling.
- */
- private static final int BASE_RAW_EXTRA = 5;
- // Base PPT lag threshold allowing to activate a burst scaleup to eliminate
high lag.
- static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 25_000;
- // Extra PPT lag threshold allowing activation of even more aggressive
scaleup to eliminate high lag,
- // also enabling lag-amplified idle calculation decay in the cost function
(to reduce idle weight).
- static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
-
public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
+ static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
+ static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+
+ /**
+ * Divisor for partition count in the K formula: K = (partitionCount /
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
+ * This controls how aggressive the scaling is relative to partition count.
+ * That value was chosen by carefully analyzing the math model behind the
implementation.
+ */
+ static final double K_PARTITION_DIVISOR = 6.4;
+
private final String supervisorId;
private final SeekableStreamSupervisor supervisor;
private final ServiceEmitter emitter;
@@ -90,7 +80,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private final WeightedCostFunction costFunction;
private volatile CostMetrics lastKnownMetrics;
- private int scaleDownCounter = 0;
+ private volatile long lastScaleActionTimeMillis = -1;
public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
@@ -106,7 +96,6 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
this.emitter = emitter;
this.costFunction = new WeightedCostFunction();
-
this.autoscalerExecutor =
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+
StringUtils.encodeForFormat(spec.getId()));
this.metricBuilder = ServiceMetricEvent.builder()
@@ -117,6 +106,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
}
+ @SuppressWarnings("unchecked")
@Override
public void start()
{
@@ -153,7 +143,6 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
if (config.isScaleDownOnTaskRolloverOnly()) {
return computeOptimalTaskCount(lastKnownMetrics);
} else {
- scaleDownCounter = 0;
return -1;
}
}
@@ -166,16 +155,16 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
// Perform scale-up actions; scale-down actions only if configured.
int taskCount = -1;
- if (optimalTaskCount > currentTaskCount) {
+ if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
- scaleDownCounter = 0; // Nullify the scaleDown counter after a
successful scaleup too.
+ lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("New task count [%d] on supervisor [%s], scaling up",
taskCount, supervisorId);
} else if (!config.isScaleDownOnTaskRolloverOnly()
+ && isScaleActionAllowed()
&& optimalTaskCount < currentTaskCount
- && optimalTaskCount > 0
- && ++scaleDownCounter >= config.getScaleDownBarrier()) {
+ && optimalTaskCount > 0) {
taskCount = optimalTaskCount;
- scaleDownCounter = 0;
+ lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("New task count [%d] on supervisor [%s], scaling down",
taskCount, supervisorId);
} else {
log.info("No scaling required for supervisor [%s]", supervisorId);
@@ -217,7 +206,9 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
currentTaskCount,
(long) metrics.getAggregateLag(),
config.getTaskCountMin(),
- config.getTaskCountMax()
+ config.getTaskCountMax(),
+ config.shouldUseTaskCountBoundaries(),
+ config.getHighLagThreshold()
);
if (validTaskCounts.length == 0) {
@@ -228,11 +219,20 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
int optimalTaskCount = -1;
CostResult optimalCost = new CostResult();
+ log.info(
+ "Current metrics: avgPartitionLag[%.1f], pollIdleRatio[%.1f],
lagWeight[%.1f], idleWeight[%.1f]",
+ metrics.getAggregateLag(),
+ metrics.getPollIdleRatio(),
+ config.getLagWeight(),
+ config.getIdleWeight()
+ );
+
for (int taskCount : validTaskCounts) {
CostResult costResult = costFunction.computeCost(metrics, taskCount,
config);
double cost = costResult.totalCost();
- log.debug(
- "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+
+ log.info(
+ "Proposed task count[%d] has total cost[%.4f] = lagCost[%.4f] +
idleCost[%.4f].",
taskCount,
cost,
costResult.lagCost(),
@@ -268,18 +268,19 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
/**
- * Generates valid task counts based on partitions-per-task ratios and
lag-driven PPT relaxation.
- * This enables gradual scaling and avoids large jumps.
- * Limits the range of task counts considered to avoid excessive computation.
+ * Generates valid task counts based on partitions-per-task ratios.
*
* @return sorted list of valid task counts within bounds
*/
+ @SuppressWarnings({"ReassignedVariable"})
static int[] computeValidTaskCounts(
int partitionCount,
int currentTaskCount,
double aggregateLag,
int taskCountMin,
- int taskCountMax
+ int taskCountMax,
+ boolean isTaskCountBoundariesEnabled,
+ int highLagThreshold
)
{
if (partitionCount <= 0 || currentTaskCount <= 0) {
@@ -288,22 +289,33 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
IntSet result = new IntArraySet();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
- final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
- aggregateLag,
- partitionCount,
- currentTaskCount,
- taskCountMax
- );
- final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
- final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
effectiveMaxIncrease);
- final int maxPartitionsPerTask = Math.min(
- partitionCount,
- currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
- );
+ int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax);
+ int maxPartitionsPerTask = Math.max(partitionCount, partitionCount /
taskCountMin);
- for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
+ if (isTaskCountBoundariesEnabled) {
+ maxPartitionsPerTask = Math.min(
+ partitionCount,
+ currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
+ );
+
+ int extraIncrease = 0;
+ if (highLagThreshold > 0) {
+ extraIncrease = computeExtraPPTIncrease(
+ highLagThreshold,
+ aggregateLag,
+ partitionCount,
+ currentTaskCount,
+ taskCountMax
+ );
+ }
+ int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
+ minPartitionsPerTask = Math.max(minPartitionsPerTask,
currentPartitionsPerTask - effectiveMaxIncrease);
+ }
+
+ for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask
+ && partitionsPerTask !=
0; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
if (taskCount >= taskCountMin && taskCount <= taskCountMax) {
result.add(taskCount);
@@ -314,32 +326,46 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
/**
* Computes extra allowed increase in partitions-per-task in scenarios when
the average per-partition lag
- * is above the configured threshold. By default, it is {@code
EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
- * Generally, one of the autoscaler priorities is to keep the lag as close
to zero as possible.
+ * is above the configured threshold.
+ * <p>
+ * This uses a logarithmic formula for consistent absolute growth:
+ * {@code deltaTasks = K * ln(lagSeverity)}
+ * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}
+ * <p>
+ * This ensures that small taskCount's get a massive relative boost,
+ * while large taskCount's receive more measured, stable increases.
*/
- static int computeExtraMaxPartitionsPerTaskIncrease(
+ static int computeExtraPPTIncrease(
+ double lagThreshold,
double aggregateLag,
int partitionCount,
int currentTaskCount,
int taskCountMax
)
{
- if (partitionCount <= 0 || taskCountMax <= 0) {
+ if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) {
return 0;
}
final double lagPerPartition = aggregateLag / partitionCount;
- if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+ if (lagPerPartition < lagThreshold) {
return 0;
}
- int rawExtra = BASE_RAW_EXTRA;
- if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) {
- rawExtra += (int) ((lagPerPartition -
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP);
- }
+ final double lagSeverity = lagPerPartition / lagThreshold;
+
+ // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1
+ // First multoplier decreases with sqrt(currentTaskCount): aggressive when
small, conservative when large
+ final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) /
Math.sqrt(currentTaskCount) * Math.log(
+ lagSeverity);
+
+ final double targetTaskCount = Math.min(taskCountMax, (double)
currentTaskCount + deltaTasks);
- final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount
/ taskCountMax);
- return (int) (rawExtra * headroomRatio);
+ // Compute precise PPT reduction to avoid early integer truncation
artifacts
+ final double currentPPT = (double) partitionCount / currentTaskCount;
+ final double targetPPT = (double) partitionCount / targetTaskCount;
+
+ return Math.max(0, (int) Math.floor(currentPPT - targetPPT));
}
/**
@@ -464,4 +490,22 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
}
+ /**
+ * Determines if a scale action is currently allowed based on the elapsed
time
+ * since the last scale action and the configured minimum scale-down delay.
+ */
+ private boolean isScaleActionAllowed()
+ {
+ if (lastScaleActionTimeMillis < 0) {
+ return true;
+ }
+
+ final long barrierMillis = config.getMinScaleDownDelay().getMillis();
+ if (barrierMillis <= 0) {
+ return true;
+ }
+
+ final long elapsedMillis = DateTimes.nowUtc().getMillis() -
lastScaleActionTimeMillis;
+ return elapsedMillis >= barrierMillis;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index d835fea5157..cb791abefb3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -29,6 +29,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -48,12 +49,12 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
static final double DEFAULT_LAG_WEIGHT = 0.25;
static final double DEFAULT_IDLE_WEIGHT = 0.75;
static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec
per task as default
- static final int DEFAULT_SCALE_DOWN_BARRIER = 5; // We delay scale down by 5
* DEFAULT_SCALE_ACTION_PERIOD_MILLIS
+ static final Duration DEFAULT_MIN_SCALE_DELAY =
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
private final boolean enableTaskAutoScaler;
private final int taskCountMax;
private final int taskCountMin;
- private Integer taskCountStart;
+ private final Integer taskCountStart;
private final long minTriggerScaleActionFrequencyMillis;
private final Double stopTaskCountRatio;
private final long scaleActionPeriodMillis;
@@ -61,17 +62,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final double lagWeight;
private final double idleWeight;
private final double defaultProcessingRate;
- /**
- * Represents the threshold value used to prevent the auto-scaler from
scaling down tasks immediately,
- * when the computed cost-based metrics fall below this barrier.
- * A higher value implies a more conservative scaling down behavior,
ensuring that tasks
- * are not prematurely terminated in scenarios of potential workload spikes
or insufficient cost savings.
- */
- private final int scaleDownBarrier;
- /**
- * Indicates whether task scaling down is limited to periods during task
rollovers only.
- * If set to {@code false}, allows scaling down during normal task run time.
- */
+ private final boolean useTaskCountBoundaries;
+ private final int highLagThreshold;
+ private final Duration minScaleDownDelay;
private final boolean scaleDownDuringTaskRolloverOnly;
@JsonCreator
@@ -86,7 +79,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("lagWeight") Double lagWeight,
@Nullable @JsonProperty("idleWeight") Double idleWeight,
@Nullable @JsonProperty("defaultProcessingRate") Double
defaultProcessingRate,
- @Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
+ @Nullable @JsonProperty("useTaskCountBoundaries") Boolean
useTaskCountBoundaries,
+ @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+ @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean
scaleDownDuringTaskRolloverOnly
)
{
@@ -105,7 +100,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate,
DEFAULT_PROCESSING_RATE);
- this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier,
DEFAULT_SCALE_DOWN_BARRIER);
+ this.useTaskCountBoundaries =
Configs.valueOrDefault(useTaskCountBoundaries, false);
+ this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
+ this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay,
DEFAULT_MIN_SCALE_DELAY);
this.scaleDownDuringTaskRolloverOnly =
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
if (this.enableTaskAutoScaler) {
@@ -133,7 +130,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >=
0");
Preconditions.checkArgument(this.defaultProcessingRate > 0,
"defaultProcessingRate must be > 0");
- Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier
must be >= 0");
+ Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0,
"minScaleDownDelay must be >= 0");
}
/**
@@ -212,12 +209,39 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return defaultProcessingRate;
}
+ /**
+ * Enables or disables the use of task count boundaries derived from the
current partitions-per-task (PPT) ratio.
+ */
+ @JsonProperty("useTaskCountBoundaries")
+ public boolean shouldUseTaskCountBoundaries()
+ {
+ return useTaskCountBoundaries;
+ }
+
+ /**
+ * Per-partition lag threshold allowing to activate a burst scaleup to
eliminate high lag.
+ */
+ @JsonProperty("highLagThreshold")
+ public int getHighLagThreshold()
+ {
+ return highLagThreshold;
+ }
+
+ /**
+ * Represents the minimum duration between successful scale actions.
+ * A higher value implies a more conservative scaling behavior, ensuring
that tasks
+ * are not scaled too frequently during workload fluctuations.
+ */
@JsonProperty
- public int getScaleDownBarrier()
+ public Duration getMinScaleDownDelay()
{
- return scaleDownBarrier;
+ return minScaleDownDelay;
}
+ /**
+ * Indicates whether task scaling down is limited to periods during task
rollovers only.
+ * If set to {@code false}, allows scaling down during normal task run time.
+ */
@JsonProperty("scaleDownDuringTaskRolloverOnly")
public boolean isScaleDownOnTaskRolloverOnly()
{
@@ -250,10 +274,12 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
&& Double.compare(that.defaultProcessingRate,
defaultProcessingRate) == 0
- && scaleDownBarrier == that.scaleDownBarrier
+ && useTaskCountBoundaries == that.useTaskCountBoundaries
+ && Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
&& scaleDownDuringTaskRolloverOnly ==
that.scaleDownDuringTaskRolloverOnly
&& Objects.equals(taskCountStart, that.taskCountStart)
- && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
+ && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
+ && highLagThreshold == that.highLagThreshold;
}
@Override
@@ -270,7 +296,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
lagWeight,
idleWeight,
defaultProcessingRate,
- scaleDownBarrier,
+ useTaskCountBoundaries,
+ highLagThreshold,
+ minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
);
}
@@ -289,7 +317,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
", lagWeight=" + lagWeight +
", idleWeight=" + idleWeight +
", defaultProcessingRate=" + defaultProcessingRate +
- ", scaleDownBarrier=" + scaleDownBarrier +
+ ", useTaskCountBoundaries=" + useTaskCountBoundaries +
+ ", highLagThreshold=" + highLagThreshold +
+ ", minScaleDownDelay=" + minScaleDownDelay +
", scaleDownDuringTaskRolloverOnly=" +
scaleDownDuringTaskRolloverOnly +
'}';
}
@@ -310,7 +340,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private Double lagWeight;
private Double idleWeight;
private Double defaultProcessingRate;
- private Integer scaleDownBarrier;
+ private Boolean useTaskCountBoundaries;
+ private Integer highLagThreshold;
+ private Duration minScaleDownDelay;
private Boolean scaleDownDuringTaskRolloverOnly;
private Builder()
@@ -377,9 +409,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
- public Builder scaleDownBarrier(int scaleDownBarrier)
+ public Builder minScaleDownDelay(Duration minScaleDownDelay)
{
- this.scaleDownBarrier = scaleDownBarrier;
+ this.minScaleDownDelay = minScaleDownDelay;
return this;
}
@@ -389,6 +421,18 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
+ public Builder useTaskCountBoundaries(boolean useTaskCountBoundaries)
+ {
+ this.useTaskCountBoundaries = useTaskCountBoundaries;
+ return this;
+ }
+
+ public Builder highLagThreshold(int highLagThreshold)
+ {
+ this.highLagThreshold = highLagThreshold;
+ return this;
+ }
+
public CostBasedAutoScalerConfig build()
{
return new CostBasedAutoScalerConfig(
@@ -402,7 +446,9 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
lagWeight,
idleWeight,
defaultProcessingRate,
- scaleDownBarrier,
+ useTaskCountBoundaries,
+ highLagThreshold,
+ minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 8a375955691..01b35242ed8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -30,24 +30,11 @@ public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);
/**
- * Represents the maximum multiplier factor applied to amplify lag-based
costs in the cost computation process.
- * This value is used to cap the lag amplification effect to prevent
excessively high cost inflation
- * caused by significant partition lag.
- * It ensures that lag-related adjustments remain bounded within a
reasonable range for stability of
- * cost-based auto-scaling decisions.
+ * The lag severity at which lagBusyFactor reaches 1.0 (full idle
suppression).
+ * lagSeverity is defined as lagPerPartition / highLagThreshold.
+ * At severity=1 (threshold), lagBusyFactor=0. At severity=MAX,
lagBusyFactor=1.0.
*/
- private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0;
- private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L;
- /**
- * It is used to calculate the denominator for the ramp formula in the cost
- * computation logic. This value represents the difference between the
maximum lag per
- * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling
activation
- * lag threshold
(CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD).
- * <p>
- * It is impacting how the cost model evaluates scaling decisions during
high-lag sceario.
- */
- private static final double RAMP_DENOMINATOR =
- LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double)
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
+ private static final int LAG_AMPLIFICATION_MAX_SEVERITY = 5;
/**
* Computes cost for a given task count using compute time metrics.
@@ -64,7 +51,11 @@ public class WeightedCostFunction
* @return CostResult containing totalCost, lagCost, and idleCost,
* or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
*/
- public CostResult computeCost(CostMetrics metrics, int proposedTaskCount,
CostBasedAutoScalerConfig config)
+ public CostResult computeCost(
+ CostMetrics metrics,
+ int proposedTaskCount,
+ CostBasedAutoScalerConfig config
+ )
{
if (metrics == null || config == null || proposedTaskCount <= 0 ||
metrics.getPartitionCount() <= 0) {
return new CostResult(Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
@@ -86,7 +77,7 @@ public class WeightedCostFunction
lagRecoveryTime = metrics.getAggregateLag() / (proposedTaskCount *
avgProcessingRate);
}
- final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount);
+ final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount, config.getHighLagThreshold());
final double idleCost = proposedTaskCount *
metrics.getTaskDurationSeconds() * predictedIdleRatio;
final double lagCost = config.getLagWeight() * lagRecoveryTime;
final double weightedIdleCost = config.getIdleWeight() * idleCost;
@@ -107,19 +98,16 @@ public class WeightedCostFunction
/**
* Estimates the idle ratio for a proposed task count.
- * Includes lag-based adjustment to eliminate high lag and
- * reduce predicted idle when work exists.
- * <p>
- * Formulas:
- * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)}
- * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)}
- * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)}
+ * Includes lag-based adjustment to suppress predicted idle when lag exceeds
the threshold,
+ * encouraging scale-up when there is real work to do.
+ * The algorithm is adjusted with {@code computeExtraPPTIncrease}, so they
may work in tandem, when enabled.
*
* @param metrics current system metrics containing idle ratio and task
count
* @param taskCount target task count to estimate an idle ratio for
* @return estimated idle ratio in range [0.0, 1.0]
*/
- private double estimateIdleRatio(CostMetrics metrics, int taskCount)
+ @SuppressWarnings("ExtractMethodRecommender")
+ private double estimateIdleRatio(CostMetrics metrics, int taskCount, int
highLagThreshold)
{
final double currentPollIdleRatio = metrics.getPollIdleRatio();
@@ -138,24 +126,13 @@ public class WeightedCostFunction
final double taskRatio = (double) taskCount / currentTaskCount;
final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 -
busyFraction / taskRatio));
- // Lag-based adjustment: more work per task → less idle
- final double lagPerTask = metrics.getAggregateLag() / taskCount;
- double lagBusyFactor = 1.0 - Math.exp(-lagPerTask /
CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD);
- final int partitionCount = metrics.getPartitionCount();
-
- if (partitionCount > 0) {
- final double lagPerPartition = metrics.getAggregateLag() /
partitionCount;
- // Lag-amplified idle decay
- if (lagPerPartition >=
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
- double ramp = Math.max(0.0,
- (lagPerPartition -
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD)
- / RAMP_DENOMINATOR
- );
- ramp = Math.min(1.0, ramp);
-
- final double multiplier = 1.0 + ramp *
(LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0);
- lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier);
- }
+ final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
+ double lagBusyFactor = 0.;
+
+ // Lag-amplified idle decay using ln(lagSeverity) / ln(maxSeverity).
+ if (highLagThreshold > 0 && lagPerPartition >= highLagThreshold) {
+ final double lagSeverity = lagPerPartition / highLagThreshold;
+ lagBusyFactor = Math.min(1.0, Math.log(lagSeverity) /
Math.log(LAG_AMPLIFICATION_MAX_SEVERITY));
}
// Clamp to valid range [0, 1]
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 364e0d75b80..96e15672bd1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -21,15 +21,16 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_DOWN_BARRIER;
public class CostBasedAutoScalerConfigTest
{
@@ -50,7 +51,8 @@ public class CostBasedAutoScalerConfigTest
+ " \"lagWeight\": 0.6,\n"
+ " \"idleWeight\": 0.4,\n"
+ " \"defaultProcessingRate\": 2000.0,\n"
- + " \"scaleDownBarrier\": 10,\n"
+ + " \"highLagThreshold\": 30000,\n"
+ + " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true\n"
+ "}";
@@ -66,8 +68,9 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
- Assert.assertEquals(10, config.getScaleDownBarrier());
+ Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+ Assert.assertEquals(30000, config.getHighLagThreshold());
// Test serialization back to JSON
String serialized = mapper.writeValueAsString(config);
@@ -94,14 +97,19 @@ public class CostBasedAutoScalerConfigTest
// Check defaults
Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS,
config.getScaleActionPeriodMillis());
- Assert.assertEquals(DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(
+ DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
+ config.getMinTriggerScaleActionFrequencyMillis()
+ );
Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
Assert.assertEquals(DEFAULT_PROCESSING_RATE,
config.getDefaultProcessingRate(), 0.001);
- Assert.assertEquals(DEFAULT_SCALE_DOWN_BARRIER,
config.getScaleDownBarrier());
+ Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
config.getMinScaleDownDelay());
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
Assert.assertNull(config.getTaskCountStart());
Assert.assertNull(config.getStopTaskCountRatio());
+ // When highLagThreshold is not set, it defaults to -1 (burst scale-up
disabled)
+ Assert.assertEquals(-1, config.getHighLagThreshold());
}
@Test
@@ -174,19 +182,20 @@ public class CostBasedAutoScalerConfigTest
public void testBuilder()
{
CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(5)
-
.taskCountStart(10)
-
.enableTaskAutoScaler(true)
-
.minTriggerScaleActionFrequencyMillis(600000L)
-
.stopTaskCountRatio(0.8)
-
.scaleActionPeriodMillis(60000L)
-
.lagWeight(0.6)
-
.idleWeight(0.4)
-
.defaultProcessingRate(2000.0)
-
.scaleDownBarrier(10)
-
.scaleDownDuringTaskRolloverOnly(true)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(5)
+
.taskCountStart(10)
+
.enableTaskAutoScaler(true)
+
.minTriggerScaleActionFrequencyMillis(600000L)
+
.stopTaskCountRatio(0.8)
+
.scaleActionPeriodMillis(60000L)
+ .lagWeight(0.6)
+
.idleWeight(0.4)
+
.defaultProcessingRate(2000.0)
+
.minScaleDownDelay(Duration.standardMinutes(10))
+
.scaleDownDuringTaskRolloverOnly(true)
+
.highLagThreshold(30000)
+ .build();
Assert.assertTrue(config.getEnableTaskAutoScaler());
Assert.assertEquals(100, config.getTaskCountMax());
@@ -198,7 +207,8 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
- Assert.assertEquals(10, config.getScaleDownBarrier());
+ Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+ Assert.assertEquals(30000, config.getHighLagThreshold());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
new file mode 100644
index 00000000000..122a449c5f7
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link CostBasedAutoScaler#computeExtraPPTIncrease}.
+ * <p>
+ * The burst scaling uses a logarithmic formula:
+ * {@code deltaTasks = K * ln(lagSeverity)}
+ * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}.
+ */
+public class CostBasedAutoScalerHighLagScalingTest
+{
+ private static final int LAG_THRESHOLD = 50_000;
+ private static final int PARTITION_COUNT = 48;
+ private static final int TASK_COUNT_MAX = 48;
+
+ /**
+ * Tests scaling behavior across different lag levels and task counts.
+ * <p>
+ * Expected behavior for 48 partitions with threshold=50K:
+ * <pre>
+ * | Current | Lag/Part | PPT reduction | Notes
|
+ *
|---------|----------|---------------|-----------------------------------------|
+ * | any | <50K | 0 | Below threshold
|
+ * | any | =50K | 0 | ln(1) = 0
|
+ * | 1 | 100K | 40 | Significant boost for recovery
|
+ * | 1 | 200K | 43 | Large boost
|
+ * | 4 | 200K | 6 | Moderate boost
|
+ * | 12 | 200K | 0 | Delta too small for PPT change
|
+ * | 24 | 200K | 0 | Delta too small for PPT change
|
+ * </pre>
+ */
+ @Test
+ public void testComputeExtraPPTIncrease()
+ {
+ // Below threshold: no boost
+ Assert.assertEquals(
+ "Below threshold should not increase PPT",
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 40_000L,
+ PARTITION_COUNT,
+ 4,
+ TASK_COUNT_MAX
+ )
+ );
+
+ // At threshold (lagSeverity=1, ln(1)=0): no boost
+ Assert.assertEquals(
+ "At threshold (ln(1)=0) should not increase PPT",
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 50_000L,
+ PARTITION_COUNT,
+ 4,
+ TASK_COUNT_MAX
+ )
+ );
+
+ // C=1, 100K lag (2x threshold): significant boost for emergency recovery
+ int boost1_100k = CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 100_000L,
+ PARTITION_COUNT,
+ 1,
+ TASK_COUNT_MAX
+ );
+ Assert.assertEquals("C=1, 100K lag boost", 40, boost1_100k);
+
+ // C=1, 200K lag (4x threshold): large boost
+ int boost1_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 200_000L,
+ PARTITION_COUNT,
+ 1,
+ TASK_COUNT_MAX
+ );
+ Assert.assertEquals("C=1, 200K lag boost", 43, boost1_200k);
+
+ // C=4, 200K lag: moderate boost (K decreases with sqrt(C))
+ int boost4_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 200_000L,
+ PARTITION_COUNT,
+ 4,
+ TASK_COUNT_MAX
+ );
+ Assert.assertEquals("C=4, 200K lag should yield a modest PPT increase", 6,
boost4_200k);
+
+ // C=12, 200K lag: delta too small to change PPT
+ int boost12_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 200_000L,
+ PARTITION_COUNT,
+ 12,
+ TASK_COUNT_MAX
+ );
+ Assert.assertEquals("C=12, 200K lag should not change PPT", 0,
boost12_200k);
+
+ // C=24, 200K lag: delta too small to change PPT
+ int boost24_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+ LAG_THRESHOLD,
+ PARTITION_COUNT * 200_000L,
+ PARTITION_COUNT,
+ 24,
+ TASK_COUNT_MAX
+ );
+ Assert.assertEquals("C=24, 200K lag should not change PPT", 0,
boost24_200k);
+ }
+
+ @Test
+ public void testComputeExtraPPTIncreaseInvalidInputs()
+ {
+ Assert.assertEquals(
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
0, 4, 48)
+ );
+ Assert.assertEquals(
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
48, 0, 48)
+ );
+ Assert.assertEquals(
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
48, 4, 0)
+ );
+ Assert.assertEquals(
+ 0,
+ CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
-1, 4, 48)
+ );
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index becfd4964b4..d0e9f90f584 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -77,13 +77,13 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleUpWhenOptimalGreaterThanCurrent()
{
- // Use config with barrier=2 to test counter reset behavior
+ // Use config with a long barrier to test cooldown behavior
CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.scaleDownBarrier(2)
-
.build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.minScaleDownDelay(Duration.standardHours(1))
+
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
mockSupervisor,
@@ -94,14 +94,7 @@ public class CostBasedAutoScalerMockTest
int currentTaskCount = 10;
int scaleUpOptimal = 17;
- int scaleDownOptimal = 5;
-
- // First, increment the scaleDownCounter by making a scale-down attempt
- doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
- Assert.assertEquals("Scale-down blocked (counter=1)", -1,
autoScaler.computeTaskCountForScaleAction());
-
- // Now trigger scale-up, which should reset the counter
+ // Trigger scale-up, which should set the cooldown timer
doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
@@ -111,11 +104,11 @@ public class CostBasedAutoScalerMockTest
autoScaler.computeTaskCountForScaleAction()
);
- // Verify counter was reset: scale-down should be blocked again (counter
starts from 0)
- doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+ // Verify cooldown blocks immediate subsequent scaling
+ doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
Assert.assertEquals(
- "Scale-down should be blocked after scale-up reset the counter",
+ "Scale action should be blocked during the cooldown window",
-1,
autoScaler.computeTaskCountForScaleAction()
);
@@ -140,13 +133,13 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleDownBlockedReturnsMinusOne()
{
- // Use config with barrier=2 to test counter behavior
+ // Use config with a long barrier to test cooldown behavior
CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.scaleDownBarrier(2)
-
.build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.minScaleDownDelay(Duration.standardHours(1))
+
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
mockSupervisor,
@@ -161,23 +154,16 @@ public class CostBasedAutoScalerMockTest
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
- // First attempt: counter=1, blocked
+ // First attempt: allowed (no prior scale action)
Assert.assertEquals(
- "Should return -1 when optimal is less than current (scale-down
blocked, counter=1)",
- -1,
- autoScaler.computeTaskCountForScaleAction()
- );
-
- // Second attempt: counter=2, succeeds (barrier reached)
- Assert.assertEquals(
- "Scale-down should succeed when barrier reached",
+ "Scale-down should succeed when no prior scale action exists",
optimalCount,
autoScaler.computeTaskCountForScaleAction()
);
- // Verify counter was reset: next scale-down should be blocked again
+ // Second attempt: blocked by cooldown
Assert.assertEquals(
- "Scale-down should be blocked after successful scale-down reset the
counter",
+ "Scale-down should be blocked during the cooldown window",
-1,
autoScaler.computeTaskCountForScaleAction()
);
@@ -300,8 +286,8 @@ public class CostBasedAutoScalerMockTest
int result = autoScaler.computeTaskCountForScaleAction();
Assert.assertEquals(
- "Should block scale-down even by one task",
- -1,
+ "Should allow scale-down by one task when cooldown has elapsed",
+ optimalCount,
result
);
}
@@ -314,7 +300,7 @@ public class CostBasedAutoScalerMockTest
.taskCountMin(1)
.enableTaskAutoScaler(true)
.scaleDownDuringTaskRolloverOnly(true)
-
.scaleDownBarrier(1) // Set the barrier to 1 so it would trigger immediately if
not blocked
+
.minScaleDownDelay(Duration.ZERO)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
@@ -345,6 +331,7 @@ public class CostBasedAutoScalerMockTest
.taskCountMin(1)
.enableTaskAutoScaler(true)
.scaleDownDuringTaskRolloverOnly(true)
+
.minScaleDownDelay(Duration.ZERO)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
@@ -357,11 +344,12 @@ public class CostBasedAutoScalerMockTest
int currentTaskCount = 50;
int optimalCount = 30;
- // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
- doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+ // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
without scaling
+ doReturn(currentTaskCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
autoScaler.computeTaskCountForScaleAction(); // This populates
lastKnownMetrics
+ doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
Assert.assertEquals(
"Should scale-down during rollover when
scaleDownDuringTaskRolloverOnly is true",
optimalCount,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 5deae662d75..bb6eca691a2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -25,12 +25,12 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -38,17 +38,13 @@ import java.util.Map;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
import static org.mockito.Mockito.when;
-@SuppressWarnings("SameParameterValue")
+@SuppressWarnings({"SameParameterValue", "InstantiationOfUtilityClass"})
public class CostBasedAutoScalerTest
{
private CostBasedAutoScaler autoScaler;
- private CostBasedAutoScalerConfig config;
@Before
public void setUp()
@@ -62,169 +58,106 @@ public class CostBasedAutoScalerTest
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn("test-stream");
- config = CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(0.6)
- .idleWeight(0.4)
- .build();
+ CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+ .lagWeight(0.6)
+
.idleWeight(0.4)
+ .build();
autoScaler = new CostBasedAutoScaler(mockSupervisor, config,
mockSupervisorSpec, mockEmitter);
}
+ @SuppressWarnings("InstantiationOfUtilityClass")
@Test
public void testComputeValidTaskCounts()
{
- // For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
- int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 1, 100);
- Assert.assertTrue("Should contain the current task count",
contains(validTaskCounts, 25));
- Assert.assertTrue("Should contain the next scale-up option",
contains(validTaskCounts, 34));
+ boolean useTaskCountBoundaries = true;
+ int highLagThreshold = 50_000;
- // Edge cases
- Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 1, 100).length);
- Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 1, 100).length);
+ // For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
+ int[] validTaskCounts = computeValidTaskCounts(
+ 100,
+ 25,
+ 0L,
+ 1,
+ 100,
+ useTaskCountBoundaries,
+ highLagThreshold
+ );
+ Assert.assertTrue("Expected current task count to be included",
contains(validTaskCounts, 25));
+ Assert.assertTrue("Expected next scale-up option (34) to be included",
contains(validTaskCounts, 34));
// Single partition
- int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 1, 100);
- Assert.assertTrue("Single partition should have at least one valid count",
singlePartition.length > 0);
- Assert.assertTrue("Single partition should contain 1",
contains(singlePartition, 1));
+ int[] singlePartition = computeValidTaskCounts(
+ 1,
+ 1,
+ 0L,
+ 1,
+ 100,
+ useTaskCountBoundaries,
+ highLagThreshold
+ );
+ Assert.assertTrue("Single partition should yield at least one valid
count", singlePartition.length > 0);
+ Assert.assertTrue("Single partition should include task count 1",
contains(singlePartition, 1));
// Current exceeds partitions - should still yield valid, deduplicated
options
- int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 1, 100);
+ int[] exceedsPartitions = computeValidTaskCounts(
+ 2,
+ 5,
+ 0L,
+ 1,
+ 100,
+ useTaskCountBoundaries,
+ highLagThreshold
+ );
Assert.assertEquals(2, exceedsPartitions.length);
Assert.assertTrue(contains(exceedsPartitions, 1));
Assert.assertTrue(contains(exceedsPartitions, 2));
- // Lag expansion: low lag should not include max, high lag should
- int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30);
+ // Lag expansion: low lag should not include max, high lag should allow
aggressive scaling
+ int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30,
useTaskCountBoundaries, highLagThreshold);
Assert.assertFalse("Low lag should not include max task count",
contains(lowLagCounts, 30));
- Assert.assertTrue("Low lag should cap scale up around 4 tasks",
contains(lowLagCounts, 4));
+ Assert.assertTrue("Low lag should cap scale-up around 4 tasks",
contains(lowLagCounts, 4));
+ // High lag uses logarithmic formula: K * ln(lagSeverity) where K =
P/(6.4*sqrt(C))
+ // For P=30, C=3, lagPerPartition=500K, threshold=50K: lagSeverity=10,
K=2.7, delta=6.2
+ // This allows controlled scaling to ~10-15 tasks (not all the way to max)
long highAggregateLag = 30L * 500_000L;
- int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 1,
30);
- Assert.assertTrue("High lag should allow scaling to max tasks",
contains(highLagCounts, 30));
+ int[] highLagCounts = computeValidTaskCounts(
+ 30,
+ 3,
+ highAggregateLag,
+ 1,
+ 30,
+ useTaskCountBoundaries,
+ highLagThreshold
+ );
+ Assert.assertTrue("High lag should allow scaling to 10 tasks",
contains(highLagCounts, 10));
+ Assert.assertTrue("High lag should allow scaling to 15 tasks",
contains(highLagCounts, 15));
+ Assert.assertFalse("High lag should not jump straight to max (30) from 3",
contains(highLagCounts, 30));
// Respects taskCountMax
- int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3);
- Assert.assertTrue("Should include taskCountMax when doable",
contains(cappedCounts, 3));
+ int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3,
useTaskCountBoundaries, highLagThreshold);
+ Assert.assertTrue("Should include taskCountMax when within bounds",
contains(cappedCounts, 3));
Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
// Respects taskCountMin - filters out values below the minimum
// With partitionCount=100, currentTaskCount=10, the computed range
includes values like 8, 9, 10, 12, 13
- int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100);
- Assert.assertFalse("Should not go below taskCountMin",
contains(minCappedCounts, 8));
- Assert.assertFalse("Should not go below taskCountMin",
contains(minCappedCounts, 9));
- Assert.assertTrue("Should include values at taskCountMin",
contains(minCappedCounts, 10));
- Assert.assertTrue("Should include values above taskCountMin",
contains(minCappedCounts, 12));
+ int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100,
useTaskCountBoundaries, highLagThreshold);
+ Assert.assertFalse("Should not include values below taskCountMin (8)",
contains(minCappedCounts, 8));
+ Assert.assertFalse("Should not include values below taskCountMin (9)",
contains(minCappedCounts, 9));
+ Assert.assertTrue("Should include values at taskCountMin (10)",
contains(minCappedCounts, 10));
+ Assert.assertTrue("Should include values above taskCountMin (12)",
contains(minCappedCounts, 12));
// Both bounds applied together
- int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12);
- Assert.assertFalse("Should not go below taskCountMin",
contains(bothBounds, 8));
- Assert.assertFalse("Should not go below taskCountMin",
contains(bothBounds, 9));
- Assert.assertFalse("Should not exceed taskCountMax", contains(bothBounds,
13));
- Assert.assertTrue("Should include values at taskCountMin",
contains(bothBounds, 10));
- Assert.assertTrue("Should include values at taskCountMax",
contains(bothBounds, 12));
- }
-
- @Test
- public void testScalingExamplesTable()
- {
- int partitionCount = 30;
- int taskCountMax = 30;
- double pollIdleRatio = 0.1;
- double avgProcessingRate = 10.0;
-
- // Create a local autoScaler with taskCountMax matching the test parameters
- SupervisorSpec mockSpec = Mockito.mock(SupervisorSpec.class);
- SeekableStreamSupervisor mockSupervisor =
Mockito.mock(SeekableStreamSupervisor.class);
- ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
- SeekableStreamSupervisorIOConfig mockIoConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
-
- when(mockSpec.getId()).thenReturn("test-supervisor");
- when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
- when(mockIoConfig.getStream()).thenReturn("test-stream");
-
- CostBasedAutoScalerConfig localConfig = CostBasedAutoScalerConfig.builder()
-
.taskCountMax(taskCountMax)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.lagWeight(0.6)
-
.idleWeight(0.4)
- .build();
-
- CostBasedAutoScaler localAutoScaler = new CostBasedAutoScaler(
- mockSupervisor,
- localConfig,
- mockSpec,
- mockEmitter
- );
-
- class Example
- {
- final int currentTasks;
- final long lagPerPartition;
- final int expectedTasks;
-
- Example(int currentTasks, long lagPerPartition, int expectedTasks)
- {
- this.currentTasks = currentTasks;
- this.lagPerPartition = lagPerPartition;
- this.expectedTasks = expectedTasks;
- }
- }
-
- Example[] examples = new Example[]{
- new Example(3, EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 8),
- new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3, 15),
- new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30),
- new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD - 1,
15),
- new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3,
30),
- new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30),
- new Example(20, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30),
- new Example(25, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10,
30)
- };
-
- for (Example example : examples) {
- long aggregateLag = example.lagPerPartition * partitionCount;
- int[] validCounts = computeValidTaskCounts(partitionCount,
example.currentTasks, aggregateLag, 1, taskCountMax);
- Assert.assertTrue(
- "Should include expected task count for current=" +
example.currentTasks + ", lag=" + example.lagPerPartition,
- contains(validCounts, example.expectedTasks)
- );
-
- CostMetrics metrics = createMetricsWithRate(
- example.lagPerPartition,
- example.currentTasks,
- partitionCount,
- pollIdleRatio,
- avgProcessingRate
- );
- int actualOptimal = localAutoScaler.computeOptimalTaskCount(metrics);
- if (actualOptimal == -1) {
- actualOptimal = example.currentTasks;
- }
- Assert.assertEquals(
- "Optimal task count should match for current=" + example.currentTasks
- + ", lag=" + example.lagPerPartition
- + ", valid=" + Arrays.toString(validCounts),
- example.expectedTasks,
- actualOptimal
- );
- }
- }
-
- @Test
- public void testComputeExtraPPTIncrease()
- {
- // No extra increase below the threshold
- Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD - 1, 30, 3, 30));
- Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L *
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30));
-
- // More aggressive increase when the lag is high
- Assert.assertEquals(8, computeExtraMaxPartitionsPerTaskIncrease(30L *
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30, 3, 30));
- // Zero when on max task count
- Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 30, 30, 30));
+ int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12,
useTaskCountBoundaries, highLagThreshold);
+ Assert.assertFalse("Should not include values below taskCountMin (8)",
contains(bothBounds, 8));
+ Assert.assertFalse("Should not include values below taskCountMin (9)",
contains(bothBounds, 9));
+ Assert.assertFalse("Should not include values above taskCountMax (13)",
contains(bothBounds, 13));
+ Assert.assertTrue("Should include values at taskCountMin (10)",
contains(bothBounds, 10));
+ Assert.assertTrue("Should include values at taskCountMax (12)",
contains(bothBounds, 12));
}
@Test
@@ -238,28 +171,38 @@ public class CostBasedAutoScalerTest
// High idle (underutilized) - should scale down
int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
- Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult <
25);
+ Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)",
scaleDownResult < 25);
// Very high idle with high task count - should scale down
int highIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
- Assert.assertTrue("Scale down scenario should return optimal <= current",
highIdleResult <= 50);
+ Assert.assertTrue("High idle should not suggest scale-up", highIdleResult
<= 50);
// With low idle and balanced weights, the algorithm should not scale up
aggressively
int lowIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
- Assert.assertTrue("With low idle and balanced weights, should not scale up
aggressively", lowIdleResult <= 25);
+ Assert.assertTrue("With low idle and balanced weights, avoid aggressive
scale-up", lowIdleResult <= 25);
}
@Test
public void testExtractPollIdleRatio()
{
// Null and empty return 0
- Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(null),
0.0001);
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()), 0.0001);
+ Assert.assertEquals("Null stats should yield 0 idle ratio", 0.,
CostBasedAutoScaler.extractPollIdleRatio(null), 0.0001);
+ Assert.assertEquals(
+ "Empty stats should yield 0 idle ratio",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
+ 0.0001
+ );
// Missing metrics return 0
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new
HashMap<>()));
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(missingMetrics), 0.0001);
+ Assert.assertEquals(
+ "Missing autoscaler metrics should yield 0 idle ratio",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
+ 0.0001
+ );
// Valid stats return average
Map<String, Map<String, Object>> validStats = new HashMap<>();
@@ -267,26 +210,46 @@ public class CostBasedAutoScalerTest
group.put("task-0", buildTaskStatsWithPollIdle(0.3));
group.put("task-1", buildTaskStatsWithPollIdle(0.5));
validStats.put("0", group);
- Assert.assertEquals(0.4,
CostBasedAutoScaler.extractPollIdleRatio(validStats), 0.0001);
+ Assert.assertEquals(
+ "Average poll idle ratio should be computed across tasks",
+ 0.4,
+ CostBasedAutoScaler.extractPollIdleRatio(validStats),
+ 0.0001
+ );
// Invalid types: non-map task metric
Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
+ Assert.assertEquals(
+ "Non-map task stats should be ignored",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(nonMapTask),
+ 0.0001
+ );
// Invalid types: empty autoscaler metrics
Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
Map<String, Object> taskStats1 = new HashMap<>();
taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new
HashMap<>());
emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
+ Assert.assertEquals(
+ "Empty autoscaler metrics should yield 0 idle ratio",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
+ 0.0001
+ );
// Invalid types: non-map autoscaler metrics
Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
Map<String, Object> taskStats2 = new HashMap<>();
taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY,
"not-a-map");
nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
+ Assert.assertEquals(
+ "Non-map autoscaler metrics should be ignored",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler),
+ 0.0001
+ );
// Invalid types: non-number poll idle ratio
Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
@@ -295,20 +258,35 @@ public class CostBasedAutoScalerTest
autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY,
"not-a-number");
taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY,
autoscalerMetrics);
nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
- Assert.assertEquals(0.,
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001);
+ Assert.assertEquals(
+ "Non-numeric poll idle ratio should be ignored",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio),
+ 0.0001
+ );
}
@Test
public void testExtractMovingAverage()
{
// Null and empty return -1
- Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null),
0.0001);
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001);
+ Assert.assertEquals("Null stats should yield -1 moving average", -1.,
CostBasedAutoScaler.extractMovingAverage(null), 0.0001);
+ Assert.assertEquals(
+ "Empty stats should yield -1 moving average",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()),
+ 0.0001
+ );
// Missing metrics return -1
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new
HashMap<>()));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001);
+ Assert.assertEquals(
+ "Missing moving averages should yield -1",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(missingMetrics),
+ 0.0001
+ );
// Valid stats return average (using 5-minute)
Map<String, Map<String, Object>> validStats = new HashMap<>();
@@ -316,32 +294,80 @@ public class CostBasedAutoScalerTest
group.put("task-0", buildTaskStatsWithMovingAverage(1000.0));
group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
validStats.put("0", group);
- Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
+ Assert.assertEquals(
+ "Average 5-minute processing rate should be computed across tasks",
+ 1500.0,
+ CostBasedAutoScaler.extractMovingAverage(validStats),
+ 0.0001
+ );
// Interval fallback: 15-minute preferred, then 5-minute, then 1-minute
Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
- fifteenMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
- Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
+ fifteenMin.put(
+ "0",
+ Collections.singletonMap(
+ "task-0",
+ buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME,
1500.0)
+ )
+ );
+ Assert.assertEquals(
+ "15-minute interval should be preferred when available",
+ 1500.0,
+ CostBasedAutoScaler.extractMovingAverage(fifteenMin),
+ 0.0001
+ );
// 1-minute as a final fallback
Map<String, Map<String, Object>> oneMin = new HashMap<>();
- oneMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
- Assert.assertEquals(500.0,
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
+ oneMin.put(
+ "0",
+ Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0))
+ );
+ Assert.assertEquals(
+ "1-minute interval should be used as a final fallback",
+ 500.0,
+ CostBasedAutoScaler.extractMovingAverage(oneMin),
+ 0.0001
+ );
// 15-minute preferred over 5-minute when both available
Map<String, Map<String, Object>> allIntervals = new HashMap<>();
- allIntervals.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0)));
- Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001);
+ allIntervals.put(
+ "0",
+ Collections.singletonMap("task-0",
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0))
+ );
+ Assert.assertEquals(
+ "15-minute interval should win when multiple intervals are present",
+ 1500.0,
+ CostBasedAutoScaler.extractMovingAverage(allIntervals),
+ 0.0001
+ );
// Falls back to 5-minute when 15-minute is null
Map<String, Map<String, Object>> nullFifteen = new HashMap<>();
- nullFifteen.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0)));
- Assert.assertEquals(750.0,
CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001);
+ nullFifteen.put(
+ "0",
+ Collections.singletonMap(
+ "task-0",
+ buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME,
FIVE_MINUTE_NAME, 750.0)
+ )
+ );
+ Assert.assertEquals(
+ "Should fall back to 5-minute when 15-minute is null",
+ 750.0,
+ CostBasedAutoScaler.extractMovingAverage(nullFifteen),
+ 0.0001
+ );
// Falls back to 1-minute when both 15 and 5 are null
Map<String, Map<String, Object>> bothNull = new HashMap<>();
bothNull.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithTwoNullIntervals(250.0)));
- Assert.assertEquals(250.0,
CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001);
+ Assert.assertEquals(
+ "Should fall back to 1-minute when 15 and 5 are null",
+ 250.0,
+ CostBasedAutoScaler.extractMovingAverage(bothNull),
+ 0.0001
+ );
}
@Test
@@ -350,19 +376,34 @@ public class CostBasedAutoScalerTest
// Non-map task metric
Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
+ Assert.assertEquals(
+ "Non-map task stats should be ignored",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(nonMapTask),
+ 0.0001
+ );
Map<String, Map<String, Object>> missingBuild = new HashMap<>();
Map<String, Object> taskStats1 = new HashMap<>();
taskStats1.put("movingAverages", new HashMap<>());
missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
+ Assert.assertEquals(
+ "Missing buildSegments moving average should yield -1",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(missingBuild),
+ 0.0001
+ );
Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
Map<String, Object> taskStats2 = new HashMap<>();
taskStats2.put("movingAverages", "not-a-map");
nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
- Assert.assertEquals(-1.,
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
+ Assert.assertEquals(
+ "Non-map movingAverages should be ignored",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(nonMapMA),
+ 0.0001
+ );
}
@Test
@@ -377,26 +418,29 @@ public class CostBasedAutoScalerTest
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("stream");
- // Test config defaults for scaleDownBarrier, defaultProcessingRate,
scaleDownDuringTaskRolloverOnly
+ // Test config defaults for minScaleDownDelay, defaultProcessingRate,
scaleDownDuringTaskRolloverOnly
CostBasedAutoScalerConfig cfgWithDefaults =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(10)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.build();
- Assert.assertEquals(5, cfgWithDefaults.getScaleDownBarrier());
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.build();
+ Assert.assertEquals(
+ CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY,
+ cfgWithDefaults.getMinScaleDownDelay()
+ );
Assert.assertEquals(1000.0, cfgWithDefaults.getDefaultProcessingRate(),
0.001);
Assert.assertFalse(cfgWithDefaults.isScaleDownOnTaskRolloverOnly());
// Test custom config values
CostBasedAutoScalerConfig cfgWithCustom =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(10)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.scaleDownBarrier(10)
-
.defaultProcessingRate(5000.0)
-
.scaleDownDuringTaskRolloverOnly(true)
-
.build();
- Assert.assertEquals(10, cfgWithCustom.getScaleDownBarrier());
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.minScaleDownDelay(Duration.standardMinutes(10))
+
.defaultProcessingRate(5000.0)
+
.scaleDownDuringTaskRolloverOnly(true)
+
.build();
+ Assert.assertEquals(Duration.standardMinutes(10),
cfgWithCustom.getMinScaleDownDelay());
Assert.assertEquals(5000.0, cfgWithCustom.getDefaultProcessingRate(),
0.001);
Assert.assertTrue(cfgWithCustom.isScaleDownOnTaskRolloverOnly());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 4f5832915f6..c5b2b867e66 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -102,7 +102,7 @@ public class WeightedCostFunctionTest
// Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
double costCurrent = costFunction.computeCost(metrics, 10,
lagOnlyConfig).totalCost();
- Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1);
+ Assert.assertEquals("Cost of current tasks", 1000., costCurrent, 0.1);
// Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
double costUp5 = costFunction.computeCost(metrics, 15,
lagOnlyConfig).totalCost();
@@ -298,11 +298,12 @@ public class WeightedCostFunctionTest
@Test
public void testLagAmplificationReducesIdleUnderHighLag()
{
- CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
+ CostBasedAutoScalerConfig configWithThreshold =
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(1)
.enableTaskAutoScaler(true)
.defaultProcessingRate(1000.0)
+
.highLagThreshold(10_000)
.build();
int currentTaskCount = 3;
@@ -310,17 +311,96 @@ public class WeightedCostFunctionTest
int partitionCount = 30;
double pollIdleRatio = 0.1;
+ // lowLag (5000) is below threshold, highLag (500000) is well above
CostMetrics lowLag = createMetrics(5_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
CostMetrics highLag = createMetrics(500_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
- double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount,
idleOnlyConfig).totalCost();
- double highLagCost = costFunction.computeCost(highLag, proposedTaskCount,
idleOnlyConfig).totalCost();
+ double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount,
configWithThreshold).totalCost();
+ double highLagCost = costFunction.computeCost(highLag, proposedTaskCount,
configWithThreshold).totalCost();
Assert.assertTrue(
"Higher lag should reduce predicted idle more aggressively",
lowLagCost > highLagCost
);
}
+ @Test
+ public void testCustomLagThresholdsAffectCostCalculation()
+ {
+ // Test that custom threshold values change behavior compared to defaults
+ int currentTaskCount = 3;
+ int proposedTaskCount = 8;
+ int partitionCount = 30;
+ double pollIdleRatio = 0.1;
+
+ // Use lag that exceeds sensitive threshold (10000) but not default (-1,
disabled)
+ CostMetrics metrics = createMetrics(15_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+
+ // Default config: highLagThreshold=-1 (disabled), no lag amplification
+ CostBasedAutoScalerConfig defaultConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.defaultProcessingRate(1000.0)
+
.build();
+
+ // Sensitive config: threshold 10000, lag 15000 > 10000, amplification
happens
+ CostBasedAutoScalerConfig sensitiveConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.defaultProcessingRate(1000.0)
+
.highLagThreshold(10000)
+
.build();
+
+ double defaultCost = costFunction.computeCost(metrics, proposedTaskCount,
defaultConfig).totalCost();
+ double sensitiveCost = costFunction.computeCost(metrics,
proposedTaskCount, sensitiveConfig).totalCost();
+
+ // With lower thresholds, the same lag triggers more aggressive scaling
behavior
+ // (higher lagBusyFactor), which results in lower predicted idle and thus
lower idle cost
+ Assert.assertTrue(
+ "More sensitive thresholds should result in different (lower) cost",
+ sensitiveCost < defaultCost
+ );
+ }
+
+ @Test
+ public void testLnSeverityScalesWithLag()
+ {
+ // Test that ln_severity lagBusyFactor increases with lag severity,
+ // producing lower idle cost at higher lag.
+ // lagSeverity = lagPerPartition / threshold
+ // lagBusyFactor = min(1.0, ln(lagSeverity) / ln(5))
+ int currentTaskCount = 3;
+ int proposedTaskCount = 8;
+ int partitionCount = 30;
+ double pollIdleRatio = 0.1;
+
+ CostBasedAutoScalerConfig customConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.defaultProcessingRate(1000.0)
+
.highLagThreshold(10000)
+
.build();
+
+ // Lag exactly at threshold (lagPerPartition = 10000, severity=1.0)
+ // lagBusyFactor = ln(1) / ln(5) = 0
+ CostMetrics atThreshold = createMetrics(10_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+
+ // Lag at 5x threshold (lagPerPartition = 50000, severity=5.0)
+ // lagBusyFactor = ln(5) / ln(5) = 1.0
+ CostMetrics atMaxSeverity = createMetrics(50_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+
+ double costAtThreshold = costFunction.computeCost(atThreshold,
proposedTaskCount, customConfig).totalCost();
+ double costAtMax = costFunction.computeCost(atMaxSeverity,
proposedTaskCount, customConfig).totalCost();
+
+ // At max severity, lagBusyFactor=1.0, idle is fully suppressed → lower
cost
+ Assert.assertTrue(
+ "Cost at max severity should be lower due to full idle suppression",
+ costAtMax < costAtThreshold
+ );
+ }
+
private CostMetrics createMetrics(
double avgPartitionLag,
int currentTaskCount,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]