This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e6a01b747c8 Adjust cost-based autoscaler algorithm (#18936)
e6a01b747c8 is described below
commit e6a01b747c81677dea02da3591791f26d5e333e1
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Mon Jan 26 19:29:41 2026 +0200
Adjust cost-based autoscaler algorithm (#18936)
---
.../supervisor/SeekableStreamSupervisor.java | 14 +-
.../SeekableStreamSupervisorIOConfig.java | 2 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 93 ++++++++++--
.../autoscaler/CostBasedAutoScalerConfig.java | 2 +-
.../autoscaler/WeightedCostFunction.java | 67 ++++++---
.../SeekableStreamSupervisorIOConfigTest.java | 2 +-
...treamSupervisorScaleDuringTaskRolloverTest.java | 19 +--
.../SeekableStreamSupervisorSpecTest.java | 4 +-
.../autoscaler/CostBasedAutoScalerTest.java | 164 ++++++++++++++++++---
.../autoscaler/WeightedCostFunctionTest.java | 134 ++++++++++-------
10 files changed, 377 insertions(+), 124 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index c5d3ffe873d..40049967b2b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3403,7 +3403,6 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
group.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
-
boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
if (entry.getValue().equals(getEndOfPartitionMarker())) {
@@ -3453,17 +3452,26 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* This method is invoked to determine whether a task count adjustment is
needed
* during a task rollover based on the recommendations from the task
auto-scaler.
*/
+
@VisibleForTesting
void maybeScaleDuringTaskRollover()
{
if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
- if (rolloverTaskCount > 0) {
+ if (rolloverTaskCount > 0 && rolloverTaskCount !=
getIoConfig().getTaskCount()) {
log.info("Autoscaler recommends scaling down to [%d] tasks during
rollover", rolloverTaskCount);
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the
next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during
the rollover.
clearAllocationInfo();
+
+ ServiceMetricEvent.Builder event = ServiceMetricEvent
+ .builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
+
+ emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
rolloverTaskCount));
}
}
}
@@ -4048,7 +4056,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
builder.put(partitionId, makeSequenceNumber(sequence,
useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
- // if we don't have a startingOffset (first run or we had some
previous failures and reset the sequences) then
+ // if we don't have a startingOffset (first run, or we had some
previous failures and reset the sequences) then
// get the sequence from metadata storage (if available) or
Kafka/Kinesis (otherwise)
OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage =
getOffsetFromStorageForPartition(
partitionId,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 8a7fcad1511..0de06a63949 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -86,7 +86,7 @@ public abstract class SeekableStreamSupervisorIOConfig
// Could be null
this.autoScalerConfig = autoScalerConfig;
this.autoScalerEnabled = autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler();
- // if autoscaler is enabled then taskCount will be ignored here and
initial taskCount will equal to taskCountStart/taskCountMin
+ // if autoscaler is enabled, then taskCount will be ignored here and
initial taskCount will equal to taskCountStart/taskCountMin
if (autoScalerEnabled) {
final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
this.taskCount = startTaskCount != null ? startTaskCount :
autoScalerConfig.getTaskCountMin();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 598a56e41d1..350e2284359 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+import it.unimi.dsi.fastutil.ints.IntArraySet;
+import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -33,9 +35,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -57,6 +57,25 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+ /**
+ * Defines the step size used for evaluating lag when computing scaling
actions.
+ * This constant helps control the granularity of lag considerations in
scaling decisions,
+ * ensuring smoother transitions between scaled states and avoiding abrupt
changes in task counts.
+ */
+ private static final int LAG_STEP = 100_000;
+ /**
+ * This parameter fine-tunes autoscaling behavior by adding extra flexibility
+ * when calculating maximum allowable partitions per task in response to lag,
+ * which must be processed as fast, as possible.
+ * It acts as a foundational factor that balances the responsiveness and
stability of autoscaling.
+ */
+ private static final int BASE_RAW_EXTRA = 5;
+ // Base PPT lag threshold allowing to activate a burst scaleup to eliminate
high lag.
+ static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
+ // Extra PPT lag threshold allowing activation of even more aggressive
scaleup to eliminate high lag,
+ // also enabling lag-amplified idle calculation decay in the cost function
(to reduce idle weight).
+ static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000;
+
public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
@@ -160,9 +179,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
* Returns -1 (no scaling needed) in the following cases:
* <ul>
* <li>Metrics are not available</li>
- * <li>Task count already optimal</li>
- * <li>The current idle ratio is in the ideal range and lag considered
low</li>
- * <li>Optimal task count equals current task count</li>
+ * <li>Current task count already optimal</li>
* </ul>
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
@@ -180,7 +197,12 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return -1;
}
- final int[] validTaskCounts =
CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount);
+ final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
+ partitionCount,
+ currentTaskCount,
+ (long) metrics.getAggregateLag(),
+ config.getTaskCountMax()
+ );
if (validTaskCounts.length == 0) {
log.warn("No valid task counts after applying constraints for
supervisorId [%s]", supervisorId);
@@ -230,22 +252,35 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
/**
- * Generates valid task counts based on partitions-per-task ratios.
+ * Generates valid task counts based on partitions-per-task ratios and
lag-driven PPT relaxation.
* This enables gradual scaling and avoids large jumps.
* Limits the range of task counts considered to avoid excessive computation.
*
* @return sorted list of valid task counts within bounds
*/
- static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+ static int[] computeValidTaskCounts(
+ int partitionCount,
+ int currentTaskCount,
+ double aggregateLag,
+ int taskCountMax
+ )
{
- if (partitionCount <= 0) {
+ if (partitionCount <= 0 || currentTaskCount <= 0) {
return new int[]{};
}
- Set<Integer> result = new HashSet<>();
+ IntSet result = new IntArraySet();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+ final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+ aggregateLag,
+ partitionCount,
+ currentTaskCount,
+ taskCountMax
+ );
+ final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
+
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
- final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+ final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
effectiveMaxIncrease);
final int maxPartitionsPerTask = Math.min(
partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
@@ -253,9 +288,41 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
- result.add(taskCount);
+ if (taskCount <= taskCountMax) {
+ result.add(taskCount);
+ }
}
- return result.stream().mapToInt(Integer::intValue).toArray();
+ return result.toIntArray();
+ }
+
+ /**
+ * Computes extra allowed increase in partitions-per-task in scenarios when
the average per-partition lag
+ * is above the configured threshold. By default, it is {@code
EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
+ * Generally, one of the autoscaler priorities is to keep the lag as close
to zero as possible.
+ */
+ static int computeExtraMaxPartitionsPerTaskIncrease(
+ double aggregateLag,
+ int partitionCount,
+ int currentTaskCount,
+ int taskCountMax
+ )
+ {
+ if (partitionCount <= 0 || taskCountMax <= 0) {
+ return 0;
+ }
+
+ final double lagPerPartition = aggregateLag / partitionCount;
+ if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+ return 0;
+ }
+
+ int rawExtra = BASE_RAW_EXTRA;
+ if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+ rawExtra += (int) ((lagPerPartition -
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP);
+ }
+
+ final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount
/ taskCountMax);
+ return (int) (rawExtra * headroomRatio);
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index d1f682f4f56..aba26ba25b5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -52,7 +52,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final boolean enableTaskAutoScaler;
private final int taskCountMax;
private final int taskCountMin;
- private final Integer taskCountStart;
+ private Integer taskCountStart;
private final long minTriggerScaleActionFrequencyMillis;
private final Double stopTaskCountRatio;
private final long scaleActionPeriodMillis;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 0da733ef9e7..8a375955691 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -29,24 +29,25 @@ import org.apache.druid.java.util.common.logger.Logger;
public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);
-
-
/**
- * Ideal idle ratio range boundaries.
- * Idle ratio below MIN indicates tasks are overloaded (scale up needed).
- * Idle ratio above MAX indicates tasks are underutilized (scale down
needed).
+ * Represents the maximum multiplier factor applied to amplify lag-based
costs in the cost computation process.
+ * This value is used to cap the lag amplification effect to prevent
excessively high cost inflation
+ * caused by significant partition lag.
+ * It ensures that lag-related adjustments remain bounded within a
reasonable range for stability of
+ * cost-based auto-scaling decisions.
*/
- static final double IDEAL_IDLE_MIN = 0.2;
- static final double IDEAL_IDLE_MAX = 0.6;
-
+ private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0;
+ private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L;
/**
- * Checks if the given idle ratio is within the ideal range [{@value
#IDEAL_IDLE_MIN}, {@value #IDEAL_IDLE_MAX}].
- * When idle is in this range, optimal utilization has been achieved and no
scaling is needed.
+ * It is used to calculate the denominator for the ramp formula in the cost
+ * computation logic. This value represents the difference between the
maximum lag per
+ * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling
activation
+ * lag threshold
(CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD).
+ * <p>
+ * It is impacting how the cost model evaluates scaling decisions during
high-lag sceario.
*/
- public static boolean isIdleInIdealRange(double idleRatio)
- {
- return idleRatio >= IDEAL_IDLE_MIN && idleRatio <= IDEAL_IDLE_MAX;
- }
+ private static final double RAMP_DENOMINATOR =
+ LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double)
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
/**
* Computes cost for a given task count using compute time metrics.
@@ -104,12 +105,15 @@ public class WeightedCostFunction
return new CostResult(cost, lagCost, weightedIdleCost);
}
-
/**
- * Estimates the idle ratio for a given task count using a capacity-based
linear model.
+ * Estimates the idle ratio for a proposed task count.
+ * Includes lag-based adjustment to eliminate high lag and
+ * reduce predicted idle when work exists.
* <p>
- * Formula: {@code predictedIdle = 1 - busyFraction / taskRatio}
- * where {@code busyFraction = 1 - currentIdleRatio} and {@code taskRatio =
targetTaskCount / currentTaskCount}.
+ * Formulas:
+ * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)}
+ * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)}
+ * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)}
*
* @param metrics current system metrics containing idle ratio and task
count
* @param taskCount target task count to estimate an idle ratio for
@@ -119,7 +123,6 @@ public class WeightedCostFunction
{
final double currentPollIdleRatio = metrics.getPollIdleRatio();
- // Handle edge cases
if (currentPollIdleRatio < 0) {
// No idle data available, assume moderate idle
return 0.5;
@@ -130,13 +133,33 @@ public class WeightedCostFunction
return currentPollIdleRatio;
}
- // Capacity-based model: idle ratio reflects spare capacity per task
+ // Linear prediction (capacity-based) - existing logic
final double busyFraction = 1.0 - currentPollIdleRatio;
final double taskRatio = (double) taskCount / currentTaskCount;
- final double predictedIdleRatio = 1.0 - busyFraction / taskRatio;
+ final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 -
busyFraction / taskRatio));
+
+ // Lag-based adjustment: more work per task → less idle
+ final double lagPerTask = metrics.getAggregateLag() / taskCount;
+ double lagBusyFactor = 1.0 - Math.exp(-lagPerTask /
CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD);
+ final int partitionCount = metrics.getPartitionCount();
+
+ if (partitionCount > 0) {
+ final double lagPerPartition = metrics.getAggregateLag() /
partitionCount;
+ // Lag-amplified idle decay
+ if (lagPerPartition >=
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+ double ramp = Math.max(0.0,
+ (lagPerPartition -
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD)
+ / RAMP_DENOMINATOR
+ );
+ ramp = Math.min(1.0, ramp);
+
+ final double multiplier = 1.0 + ramp *
(LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0);
+ lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier);
+ }
+ }
// Clamp to valid range [0, 1]
- return Math.max(0.0, Math.min(1.0, predictedIdleRatio));
+ return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor));
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index a97e971028c..56b8904cc18 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -80,7 +80,7 @@ public class SeekableStreamSupervisorIOConfigTest
}
@Test
- public void testAutoScalerEnabledTrueAndFalse()
+ public void testAutoScalerEnabledPreservesTaskCountWhenNonNull()
{
LagAggregator lagAggregator = mock(LagAggregator.class);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index 2c4275a99ec..b39e6097903 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -61,11 +61,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
supervisor.maybeScaleDuringTaskRollover();
// Then
- Assert.assertEquals(
- "Task count should not change when taskAutoScaler is null",
- beforeTaskCount,
- (int) supervisor.getIoConfig().getTaskCount()
- );
+ Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig());
}
@Test
@@ -88,6 +84,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
supervisor.maybeScaleDuringTaskRollover();
// Then
+ Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should not change when rolloverTaskCount <= 0",
beforeTaskCount,
@@ -111,12 +108,11 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
supervisor.start();
supervisor.createAutoscaler(spec);
- Assert.assertEquals(DEFAULT_TASK_COUNT, (int)
supervisor.getIoConfig().getTaskCount());
-
// When
supervisor.maybeScaleDuringTaskRollover();
// Then
+ Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should be updated to " + targetTaskCount + " when
rolloverTaskCount > 0",
targetTaskCount,
@@ -144,6 +140,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
supervisor.maybeScaleDuringTaskRollover();
// Then
+ Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should not change when rolloverTaskCount is 0",
beforeTaskCount,
@@ -201,13 +198,11 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
{
return CostBasedAutoScalerConfig.builder()
+ .enableTaskAutoScaler(true)
.taskCountMax(100)
.taskCountMin(1)
- .taskCountStart(DEFAULT_TASK_COUNT)
- .enableTaskAutoScaler(true)
- .lagWeight(0.25)
- .idleWeight(0.75)
- .scaleActionPeriodMillis(100)
+ .taskCountStart(1)
+ .scaleActionPeriodMillis(60000)
.build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 7df22c467a1..db805c21187 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -1457,7 +1457,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
1,
- taskCount,
+ null, // autoscaler uses taskCountStart/taskCountMin for the initial
value
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
@@ -1478,7 +1478,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
1,
- taskCount,
+ null, // autoscaler uses taskCountStart/taskCountMin for the initial
value
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index caf5453f521..54299d915f6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -30,6 +30,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -37,11 +38,16 @@ import java.util.Map;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease;
+import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
import static org.mockito.Mockito.when;
+@SuppressWarnings("SameParameterValue")
public class CostBasedAutoScalerTest
{
private CostBasedAutoScaler autoScaler;
+ private CostBasedAutoScalerConfig config;
@Before
public void setUp()
@@ -55,13 +61,13 @@ public class CostBasedAutoScalerTest
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn("test-stream");
- CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
- .lagWeight(0.6)
-
.idleWeight(0.4)
- .build();
+ config = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.6)
+ .idleWeight(0.4)
+ .build();
autoScaler = new CostBasedAutoScaler(mockSupervisor, config,
mockSupervisorSpec, mockEmitter);
}
@@ -70,26 +76,122 @@ public class CostBasedAutoScalerTest
public void testComputeValidTaskCounts()
{
// For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
- int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100,
25);
+ int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100);
Assert.assertTrue("Should contain the current task count",
contains(validTaskCounts, 25));
Assert.assertTrue("Should contain the next scale-up option",
contains(validTaskCounts, 34));
// Edge cases
- Assert.assertEquals("Zero partitions return empty array", 0,
CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
- Assert.assertEquals("Negative partitions return empty array", 0,
CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length);
+ Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length);
+ Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length);
// Single partition
- int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1);
+ int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100);
Assert.assertTrue("Single partition should have at least one valid count",
singlePartition.length > 0);
Assert.assertTrue("Single partition should contain 1",
contains(singlePartition, 1));
// Current exceeds partitions - should still yield valid, deduplicated
options
- int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5);
+ int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100);
Assert.assertEquals(2, exceedsPartitions.length);
Assert.assertTrue(contains(exceedsPartitions, 1));
Assert.assertTrue(contains(exceedsPartitions, 2));
}
+ @Test
+ public void testComputeValidTaskCountsLagExpansion()
+ {
+ int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
+ Assert.assertFalse("Low lag should not include max task count",
contains(lowLagCounts, 30));
+ Assert.assertTrue("Low lag should cap scale up around 4 tasks",
contains(lowLagCounts, 4));
+
+ long highAggregateLag = 30L * 500_000L;
+ int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
+ Assert.assertTrue("High lag should allow scaling to max tasks",
contains(highLagCounts, 30));
+ }
+
+ @Test
+ public void testComputeValidTaskCountsRespectsTaskCountMax()
+ {
+ long highAggregateLag = 30L * 500_000L;
+ int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
+ Assert.assertTrue("Should include taskCountMax when doable",
contains(cappedCounts, 3));
+ Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
+ }
+
+ @Test
+ public void testScalingExamplesTable()
+ {
+ int partitionCount = 30;
+ int taskCountMax = 30;
+ double pollIdleRatio = 0.1;
+ double avgProcessingRate = 10.0;
+
+ class Example
+ {
+ final int currentTasks;
+ final long lagPerPartition;
+ final int expectedTasks;
+
+ Example(int currentTasks, long lagPerPartition, int expectedTasks)
+ {
+ this.currentTasks = currentTasks;
+ this.lagPerPartition = lagPerPartition;
+ this.expectedTasks = expectedTasks;
+ }
+ }
+
+ Example[] examples = new Example[]{
+ new Example(3, 50_000L, 8),
+ new Example(3, 300_000L, 15),
+ new Example(3, 500_000L, 30),
+ new Example(10, 100_000L, 15),
+ new Example(10, 300_000L, 30),
+ new Example(10, 500_000L, 30),
+ new Example(20, 500_000L, 30),
+ new Example(25, 500_000L, 30)
+ };
+
+ for (Example example : examples) {
+ long aggregateLag = example.lagPerPartition * partitionCount;
+ int[] validCounts = computeValidTaskCounts(partitionCount,
example.currentTasks, aggregateLag, taskCountMax);
+ Assert.assertTrue(
+ "Should include expected task count for current=" +
example.currentTasks + ", lag=" + example.lagPerPartition,
+ contains(validCounts, example.expectedTasks)
+ );
+
+ CostMetrics metrics = createMetricsWithRate(
+ example.lagPerPartition,
+ example.currentTasks,
+ partitionCount,
+ pollIdleRatio,
+ avgProcessingRate
+ );
+ int actualOptimal = autoScaler.computeOptimalTaskCount(metrics);
+ if (actualOptimal == -1) {
+ actualOptimal = example.currentTasks;
+ }
+ Assert.assertEquals(
+ "Optimal task count should match for current=" + example.currentTasks
+ + ", lag=" + example.lagPerPartition
+ + ", valid=" + Arrays.toString(validCounts),
+ example.expectedTasks,
+ actualOptimal
+ );
+ }
+ }
+
+ @Test
+ public void testComputeExtraPPTIncrease()
+ {
+ // No extra increase below the threshold
+ Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
49_000L, 30, 3, 30));
+ Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L *
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30));
+
+ // More aggressive increase when the lag is high
+ Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L *
300_000L, 30, 3, 30));
+ // Zero when on max task count
+ Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L *
500_000L, 30, 30, 30));
+ }
+
@Test
public void testComputeOptimalTaskCountInvalidInputs()
{
@@ -110,7 +212,7 @@ public class CostBasedAutoScalerTest
int highIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
Assert.assertTrue("Scale down scenario should return optimal <= current",
highIdleResult <= 50);
- // With low idle and balanced weights, algorithm should not scale up
aggressively
+ // With low idle and balanced weights, the algorithm should not scale up
aggressively
int lowIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
Assert.assertTrue("With low idle and balanced weights, should not scale up
aggressively", lowIdleResult <= 25);
}
@@ -194,10 +296,16 @@ public class CostBasedAutoScalerTest
{
// 15-minute average is preferred
Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
- fifteenMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
+ fifteenMin.put(
+ "0",
+ Collections.singletonMap(
+ "task-0",
+ buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME,
1500.0)
+ )
+ );
Assert.assertEquals(1500.0,
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
- // 1-minute as final fallback
+ // 1-minute as a final fallback
Map<String, Map<String, Object>> oneMin = new HashMap<>();
oneMin.put("0", Collections.singletonMap("task-0",
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
Assert.assertEquals(500.0,
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
@@ -328,7 +436,7 @@ public class CostBasedAutoScalerTest
@Test
public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
{
- // Tests the case where lastKnownMetrics is null (no
computeTaskCountForScaleAction called)
+ // Tests the case where the lastKnownMetrics is null (no
computeTaskCountForScaleAction called)
SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
@@ -369,6 +477,24 @@ public class CostBasedAutoScalerTest
);
}
+ private CostMetrics createMetricsWithRate(
+ double avgPartitionLag,
+ int currentTaskCount,
+ int partitionCount,
+ double pollIdleRatio,
+ double avgProcessingRate
+ )
+ {
+ return new CostMetrics(
+ avgPartitionLag,
+ currentTaskCount,
+ partitionCount,
+ pollIdleRatio,
+ 3600,
+ avgProcessingRate
+ );
+ }
+
private boolean contains(int[] array, int value)
{
for (int i : array) {
@@ -434,7 +560,11 @@ public class CostBasedAutoScalerTest
return taskStats;
}
- private Map<String, Object> buildTaskStatsWithNullInterval(String
nullInterval, String validInterval, double processedRate)
+ private Map<String, Object> buildTaskStatsWithNullInterval(
+ String nullInterval,
+ String validInterval,
+ double processedRate
+ )
{
Map<String, Object> buildSegments = new HashMap<>();
buildSegments.put(nullInterval, null);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 90b50477c92..416def7e3ab 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -51,20 +51,24 @@ public class WeightedCostFunctionTest
Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0);
Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0);
Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY,
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10,
config).totalCost(), 0.0);
+ Assert.assertEquals(
+ Double.POSITIVE_INFINITY,
+ costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10,
config).totalCost(),
+ 0.0
+ );
}
@Test
public void testScaleDownHasHigherLagCostThanCurrent()
{
CostBasedAutoScalerConfig lagOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(1.0)
- .idleWeight(0.0)
- .defaultProcessingRate(100.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.defaultProcessingRate(100.0)
+
.build();
CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
@@ -85,13 +89,13 @@ public class WeightedCostFunctionTest
// With lag-only config (no idle penalty), the marginal model is used for
scale-up:
// lagRecoveryTime = aggregateLag / (taskCountDiff * rate)
CostBasedAutoScalerConfig lagOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(1.0)
- .idleWeight(0.0)
- .defaultProcessingRate(1000.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.defaultProcessingRate(1000.0)
+
.build();
// aggregateLag = 100000 * 100 = 10,000,000
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
@@ -113,18 +117,13 @@ public class WeightedCostFunctionTest
}
@Test
- public void testBalancedWeightsFavorStabilityOverScaleUp()
+ public void testBalancedWeightsFavorStabilityOverScaleUpOnSmallLag()
{
- // With the marginal lag model and corrected idle ratio, balanced weights
- // favor stability because idle cost increases significantly with more
tasks
- // This is intentional behavior: the algorithm is conservative about
scale-up.
- CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
-
+ // Validate idle ratio estimation and ensure balanced weights still favor
stability.
+ CostMetrics metrics = createMetrics(100.0, 10, 100, 0.3);
double costCurrent = costFunction.computeCost(metrics, 10,
config).totalCost();
double costScaleUp = costFunction.computeCost(metrics, 20,
config).totalCost();
- // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from
- // scaling up dominates the lag recovery benefit
Assert.assertTrue(
"With balanced weights, staying at current count is cheaper than
scale-up",
costCurrent < costScaleUp
@@ -193,13 +192,13 @@ public class WeightedCostFunctionTest
// Use lag-only config to isolate the lag recovery time component
CostBasedAutoScalerConfig lagOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(1.0)
- .idleWeight(0.0)
- .defaultProcessingRate(1000.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.defaultProcessingRate(1000.0)
+
.build();
double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount
+ 5, lagOnlyConfig).totalCost();
double costDown5 = costFunction.computeCost(metricsNoRate,
currentTaskCount - 5, lagOnlyConfig).totalCost();
@@ -218,13 +217,13 @@ public class WeightedCostFunctionTest
// Test that idle cost increases monotonically with task count.
// With fixed load, adding more tasks means each task has less work, so
idle increases.
CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(0.0)
- .idleWeight(1.0)
- .defaultProcessingRate(1000.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.0)
+
.idleWeight(1.0)
+
.defaultProcessingRate(1000.0)
+
.build();
// Current: 10 tasks with 40% idle (60% busy)
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
@@ -244,13 +243,13 @@ public class WeightedCostFunctionTest
public void testIdleRatioClampingAtBoundaries()
{
CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(0.0)
- .idleWeight(1.0)
- .defaultProcessingRate(1000.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.0)
+
.idleWeight(1.0)
+
.defaultProcessingRate(1000.0)
+
.build();
// Extreme scale-down: 10 tasks → 2 tasks with 40% idle
// busyFraction = 0.6, taskRatio = 0.2
@@ -275,13 +274,13 @@ public class WeightedCostFunctionTest
public void testIdleRatioWithMissingData()
{
CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
- .taskCountMax(100)
- .taskCountMin(1)
- .enableTaskAutoScaler(true)
- .lagWeight(0.0)
- .idleWeight(1.0)
- .defaultProcessingRate(1000.0)
- .build();
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.0)
+
.idleWeight(1.0)
+
.defaultProcessingRate(1000.0)
+
.build();
// Negative idle ratio indicates missing data → should default to 0.5
CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0);
@@ -296,7 +295,38 @@ public class WeightedCostFunctionTest
Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 *
0.5, cost20, 0.0001);
}
- private CostMetrics createMetrics(double avgPartitionLag, int
currentTaskCount, int partitionCount, double pollIdleRatio)
+ @Test
+ public void testLagAmplificationReducesIdleUnderHighLag()
+ {
+ CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.defaultProcessingRate(1000.0)
+
.build();
+
+ int currentTaskCount = 3;
+ int proposedTaskCount = 8;
+ int partitionCount = 30;
+ double pollIdleRatio = 0.1;
+
+ CostMetrics lowLag = createMetrics(40_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+ CostMetrics highLag = createMetrics(500_000.0, currentTaskCount,
partitionCount, pollIdleRatio);
+
+ double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount,
idleOnlyConfig).totalCost();
+ double highLagCost = costFunction.computeCost(highLag, proposedTaskCount,
idleOnlyConfig).totalCost();
+ Assert.assertTrue(
+ "Higher lag should reduce predicted idle more aggressively",
+ lowLagCost > highLagCost
+ );
+ }
+
+ private CostMetrics createMetrics(
+ double avgPartitionLag,
+ int currentTaskCount,
+ int partitionCount,
+ double pollIdleRatio
+ )
{
return new CostMetrics(
avgPartitionLag,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]