This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6e2f4726167 refactor: clean up min/max task count limits in streaming
task autoscaler (#19369)
6e2f4726167 is described below
commit 6e2f4726167a18989833bd62be1f1c9982df2c1c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue May 19 16:34:29 2026 -0700
refactor: clean up min/max task count limits in streaming task autoscaler
(#19369)
---
.../supervisor/SeekableStreamSupervisor.java | 71 ++++++---
.../supervisor/autoscaler/CostBasedAutoScaler.java | 97 +++----------
.../supervisor/autoscaler/LagBasedAutoScaler.java | 95 ++----------
.../SeekableStreamSupervisorSpecTest.java | 109 +++++++++++---
.../SeekableStreamSupervisorStateTest.java | 160 ++++++++++++++++++++-
.../autoscaler/CostBasedAutoScalerMockTest.java | 129 ++++-------------
.../autoscaler/LagBasedAutoScalerTest.java | 14 +-
.../autoscaler/SupervisorTaskAutoScaler.java | 24 +++-
8 files changed, 388 insertions(+), 311 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 91b4244c0bf..9bef543496b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -540,29 +540,65 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final int desiredTaskCount = computeDesiredTaskCount.call();
final int currentTaskCount = getCurrentTaskCount();
- if (desiredTaskCount <= 0) {
- return;
- }
-
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
- // 1) This should already be handled by the auto-scaler
implementation, but make sure we catch/record these for auditability
- if (desiredTaskCount == currentTaskCount) {
+ if (desiredTaskCount <= 0) {
log.warn(
- "Skipping scaling for supervisor[%s] for dataSource[%s]:
already at desired task count [%d]",
+ "Auto-scaler returned pathological taskCount[%d] for
supervisor[%s] for dataSource[%s]; skipping scale.",
+ desiredTaskCount,
+ supervisorId,
+ dataSource
+ );
+ emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION,
"Auto-scaler failed to compute a task count")
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
+ return;
+ }
+
+ final int partitionCount = getPartitionCount();
+ final int rawMin = autoScalerConfig.getTaskCountMin();
+ final int rawMax = autoScalerConfig.getTaskCountMax();
+ final int taskCountMin = partitionCount > 0 ? Math.min(rawMin,
partitionCount) : rawMin;
+ final int taskCountMax = partitionCount > 0 ? Math.min(rawMax,
partitionCount) : rawMax;
+ final int clampedTaskCount = Math.min(taskCountMax,
Math.max(taskCountMin, desiredTaskCount));
+
+ if (clampedTaskCount == currentTaskCount) {
+ // Don't emit on the steady-state no-op.
+ if (desiredTaskCount == currentTaskCount) {
+ log.debug(
+ "No scale action for supervisor[%s] for dataSource[%s]:
scaler wants [%d], current [%d].",
+ supervisorId,
+ dataSource,
+ desiredTaskCount,
+ currentTaskCount
+ );
+ return;
+ }
+
+ final String skipReason;
+ if (desiredTaskCount > taskCountMax) {
+ skipReason = "Already at max task count";
+ } else {
+ skipReason = "Already at min task count";
+ }
+ log.info(
+ "Skipping scaling for supervisor[%s] for dataSource[%s]: [%s]
(scaler wants [%d], current [%d], bounds [%d,%d])",
supervisorId,
dataSource,
- desiredTaskCount
+ skipReason,
+ desiredTaskCount,
+ currentTaskCount,
+ taskCountMin,
+ taskCountMax
);
- emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION,
"desired capacity reached")
+ emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION,
skipReason)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
return;
}
- // 2) Make sure we wait for any pending completion tasks to finish.
+ // Make sure we wait for any pending completion tasks to finish.
// At this point there could be 3 generations of tasks: pending
completion tasks (old generation), running tasks (current generation), and
(after our scale) pending tasks (new generation).
// We want to avoid killing any old generation tasks preemptively,
as that might cause the current generation tasks' offsets to become invalid.
for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
@@ -583,7 +619,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- // 3) Make sure we are not breaching any scaling cooldown limits.
+ // Make sure we are not breaching any scaling cooldown limits.
// Scaling operations are disruptive — scale-down in particular can
leave the supervisor
// under-resourced while it recovers from lag induced by the scale
event, so callers may
// configure a longer cooldown for scale-down than for scale-up.
Both directions are measured against the same
@@ -592,23 +628,24 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final ScaleDirection scaleDirection;
final long cooldownMillis;
- if (desiredTaskCount > currentTaskCount) {
+ if (clampedTaskCount > currentTaskCount) {
scaleDirection = ScaleDirection.SCALE_UP;
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
- } else { // desiredTaskCount < currentTaskCount
+ } else { // clampedTaskCount < currentTaskCount
scaleDirection = ScaleDirection.SCALE_DOWN;
cooldownMillis =
autoScalerConfig.getMinScaleDownDelay().getMillis();
}
if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
log.info(
- "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s]
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired
task count is [%d], current task count is [%d]",
+ "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s]
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! scaler
wants [%d] (clamped [%d]), current task count is [%d]",
nowTime - dynamicTriggerLastScaleRunTime,
scaleDirection,
cooldownMillis,
supervisorId,
dataSource,
desiredTaskCount,
+ clampedTaskCount,
currentTaskCount
);
@@ -620,10 +657,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
- // At this point, we can reasonably attempt a scaling action, so
emit our required task count
+ // Emit the scaler's unclamped preferred count so operators see what
it wants.
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
- boolean allocationSuccess = changeTaskCount(desiredTaskCount);
+ boolean allocationSuccess = changeTaskCount(clampedTaskCount);
if (allocationSuccess) {
onSuccessfulScale.run();
dynamicTriggerLastScaleRunTime = nowTime;
@@ -672,7 +709,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
final int currentActiveTaskCount = getCurrentTaskCount();
- if (desiredActiveTaskCount < 0 || desiredActiveTaskCount ==
currentActiveTaskCount) {
+ if (desiredActiveTaskCount <= 0 || desiredActiveTaskCount ==
currentActiveTaskCount) {
return false;
} else {
log.info(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9d50266cd73..7dc9dd0c2d6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -160,7 +160,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
if (config.isScaleDownOnTaskRolloverOnly()) {
return computeOptimalTaskCount(lastKnownMetrics);
} else {
- return -1;
+ return CANNOT_COMPUTE;
}
}
@@ -169,68 +169,24 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
lastKnownMetrics = collectMetrics();
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- int currentTaskCount = supervisor.getIoConfig().getTaskCount();
-
- // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
- // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
- final boolean isTaskCountOutOfBounds = currentTaskCount <
config.getTaskCountMin()
- || currentTaskCount >
config.getTaskCountMax();
- if (isTaskCountOutOfBounds) {
- currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), currentTaskCount));
+ if (optimalTaskCount <= 0) {
+ return CANNOT_COMPUTE;
}
- // Perform scale-up actions; scale-down actions only if configured.
- final int taskCount;
+ final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
- // If task count is out of bounds, scale to the configured boundary
- // regardless of optimal task count, to get back to a safe state.
- if (isTaskCountOutOfBounds) {
- taskCount = currentTaskCount;
- log.info(
- "Task count for supervisor[%s] was out of bounds [%d,%d], urgently
scaling from [%d] to [%d].",
- supervisorId, config.getTaskCountMin(), config.getTaskCountMax(),
currentTaskCount, currentTaskCount
- );
- } else if (optimalTaskCount > currentTaskCount) {
- taskCount = optimalTaskCount;
- log.info(
- "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).",
- supervisorId,
- currentTaskCount,
- taskCount
- );
- } else if (!config.isScaleDownOnTaskRolloverOnly()
- && optimalTaskCount < currentTaskCount
- && optimalTaskCount > 0) {
- taskCount = optimalTaskCount;
- log.info(
- "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).",
- supervisorId,
- currentTaskCount,
- taskCount
- );
- } else {
- taskCount = -1;
- log.debug("No scaling required for supervisor[%s]", supervisorId);
-
- // Emit metrics for scaling skip reasons; in case of min == max,
signaling reaching
- // max task count has bigger priority for the external observers /
trackers
- if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount ==
config.getTaskCountMax()) {
- emitter.emit(getMetricBuilder()
- .setDimension(
-
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
- "Already at max task count"
- )
-
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
currentTaskCount));
- } else if (optimalTaskCount == config.getTaskCountMin() ||
currentTaskCount == config.getTaskCountMin()) {
- emitter.emit(getMetricBuilder()
- .setDimension(
-
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
- "Already at min task count"
- )
-
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
currentTaskCount));
- }
+ // Rollover-only scale-down mode: don't proactively scale down here.
+ if (config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount <
currentTaskCount) {
+ return currentTaskCount;
}
- return taskCount;
+
+ log.info(
+ "CostBasedAutoScaler for supervisor[%s] wants taskCount[%d]
(current[%d]).",
+ supervisorId,
+ optimalTaskCount,
+ currentTaskCount
+ );
+ return optimalTaskCount;
}
public CostBasedAutoScalerConfig getConfig()
@@ -239,15 +195,9 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
/**
- * Computes the optimal task count based on current metrics.
- * <p>
- * Returns -1 (no scaling needed) in the following cases:
- * <ul>
- * <li>Metrics are not available</li>
- * <li>Current task count already optimal</li>
- * </ul>
- *
- * @return optimal task count, or -1 if no scaling action is needed
+ * Returns the lowest-cost task count given {@code metrics}, or {@link
#CANNOT_COMPUTE} when
+ * metrics are unusable. Returning the current task count means the current
count is already
+ * optimal (or no better candidate could be evaluated).
*/
int computeOptimalTaskCount(CostMetrics metrics)
{
@@ -259,13 +209,13 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
.setDimension(DruidMetrics.DESCRIPTION, result.error())
.setMetric(INVALID_METRICS_COUNT, 1L)
);
- return -1;
+ return CANNOT_COMPUTE;
}
final int partitionCount = metrics.getPartitionCount();
final int currentTaskCount = metrics.getCurrentTaskCount();
if (partitionCount <= 0 || currentTaskCount <= 0) {
- return -1;
+ return CANNOT_COMPUTE;
}
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
@@ -276,8 +226,9 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
if (validTaskCounts.length == 0) {
+ // Return current count (not an error) so the supervisor can clamp it
back into bounds.
log.warn("No valid task counts after applying constraints for
supervisor[%s]", supervisorId);
- return -1;
+ return currentTaskCount;
}
// Start with the current task count as optimal
@@ -334,9 +285,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
emitter.emit(getMetricBuilder().setMetric(LAG_COST_METRIC,
optimalCost.lagCost()));
emitter.emit(getMetricBuilder().setMetric(IDLE_COST_METRIC,
optimalCost.idleCost()));
- if (optimalTaskCount == currentTaskCount) {
- return -1;
- } else {
+ if (optimalTaskCount != currentTaskCount) {
log.info(
"Optimal taskCount[%d] for supervisor[%s] has lowest cost[%.4f] out
of the following candidates: %n%s",
optimalTaskCount, supervisorId, optimalCost.totalCost(),
constructCostTable(validTaskCounts, costResults)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index b96a1de7bd4..adb041911af 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -31,8 +31,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.query.DruidMetrics;
import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
@@ -52,7 +50,6 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
private final ServiceEmitter emitter;
- private final ServiceMetricEvent.Builder metricBuilder;
private static final ReentrantLock LOCK = new ReentrantLock(true);
@@ -77,10 +74,6 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
this.spec = spec;
this.supervisor = supervisor;
this.emitter = emitter;
- metricBuilder = ServiceMetricEvent.builder()
-
.setDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
- .setDimension(DruidMetrics.DATASOURCE,
dataSource)
- .setDimension(DruidMetrics.STREAM,
this.supervisor.getIoConfig().getStream());
}
@Override
@@ -88,7 +81,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
{
Callable<Integer> scaleAction = () -> {
LOCK.lock();
- int desiredTaskCount = -1;
+ int desiredTaskCount = CANNOT_COMPUTE;
try {
desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
}
@@ -197,27 +190,9 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
};
}
- /**
- * This method determines whether to do scale actions based on collected lag
points.
- * The current algorithm of scale is straightforward:
- * <ul>
- * <li>First, compute the proportion of lag points higher/lower than {@code
scaleOutThreshold/scaleInThreshold},
- * getting {@code scaleInThreshold/scaleOutThreshold},.
- * <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
- * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
- * <ul><li>P.S. Scale out action has a higher priority than scale in
action.</ul>
- * <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
- * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold},
scale out/in action would be triggered.
- * </ul>
- *
- * @param lags the lag metrics of Stream (Kafka/Kinesis)
- * @return Integer, target number of tasksCount. -1 means skip scale action.
- */
@VisibleForTesting
int computeDesiredTaskCount(List<Long> lags)
{
- // if the supervisor is not suspended, ensure required tasks are running
- // if suspended, ensure tasks have been requested to gracefully stop
log.debug(
"Computing the desired task count for supervisor[%s], based on
following lags : [%s]",
spec.getId(),
@@ -237,74 +212,30 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
double beyondProportion = beyond * 1.0 / metricsCount;
double withinProportion = within * 1.0 / metricsCount;
- log.debug("Calculated beyondProportion is [%s] and withinProportion is
[%s] for supervisor[%s].", beyondProportion,
- withinProportion, spec.getId()
+ log.debug(
+ "Calculated beyondProportion is [%s] and withinProportion is [%s] for
supervisor[%s].",
+ beyondProportion,
+ withinProportion,
+ spec.getId()
);
- int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
- int desiredActiveTaskCount;
final int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?",
spec.getId());
- return -1;
+ return CANNOT_COMPUTE;
}
- final int actualTaskCountMax =
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
- final int actualTaskCountMin =
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
-
- // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
- // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
- // If that is happening, take the bound and return early.
- final boolean isTaskCountOutOfBounds = currentActiveTaskCount <
actualTaskCountMin
- || currentActiveTaskCount >
actualTaskCountMax;
- if (isTaskCountOutOfBounds) {
- currentActiveTaskCount = Math.min(actualTaskCountMax,
Math.max(actualTaskCountMin, currentActiveTaskCount));
- return currentActiveTaskCount;
- }
+ final int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
if (beyondProportion >=
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
- // Do Scale out
- final int taskCount = currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep();
- if (currentActiveTaskCount == actualTaskCountMax) {
- log.debug(
- "CurrentActiveTaskCount reached task count Max limit, skipping
scale out action for supervisor[%s].",
- spec.getId()
- );
- emitter.emit(metricBuilder
- .setDimension(
-
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
- "Already at max task count"
- )
-
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
taskCount));
- return -1;
- } else {
- desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
- }
- return desiredActiveTaskCount;
+ // scale-out: step up from current, capped by partition count
(scaler-internal constraint)
+ return Math.min(currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep(), partitionCount);
}
-
if (withinProportion >=
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
- // Do Scale in
- final int taskCount = currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
- if (currentActiveTaskCount == actualTaskCountMin) {
- log.debug(
- "CurrentActiveTaskCount reached task count Min limit[%d], skipping
scale in action for supervisor[%s].",
- actualTaskCountMin,
- spec.getId()
- );
- emitter.emit(metricBuilder
- .setDimension(
-
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
- "Already at min task count"
- )
-
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
taskCount));
- return -1;
- } else {
- desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
- }
- return desiredActiveTaskCount;
+ return Math.max(1, currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep());
}
- return -1;
+ // Neither trigger fired; the scaler's preferred count is the current
count.
+ return currentActiveTaskCount;
}
public LagBasedAutoScalerConfig getAutoScalerConfig()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 3d0c6426feb..8d1b5350e8c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -400,8 +400,11 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ // Use taskCountMax=10 so both the scaler and the supervisor's bounds
agree and multiple
+ // scale-out iterations have headroom before hitting max (which is what
lets us observe the
+ // "Scale cooldown not elapsed yet" skip reason emitted by the supervisor
on later iterations).
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true,
10)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -457,7 +460,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -510,7 +513,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -559,7 +562,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -606,7 +609,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
false)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -655,10 +658,42 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
@Test
public void
testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions()
throws InterruptedException
{
+ // Verifies that when the operator misconfigures taskCountMin above
partitionCount, the
+ // supervisor's partition-count ceiling brings both bounds down to
partitionCount and any
+ // scale action settles at partitionCount.
+ final Map<String, Object> misconfiguredProps = getScaleInProperties();
+ misconfiguredProps.put("taskCountMax", 20);
+ misconfiguredProps.put("taskCountMin", 15);
+ final AutoScalerConfig misconfiguredAutoScalerConfig =
+ mapper.convertValue(misconfiguredProps, AutoScalerConfig.class);
+
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
false)).anyTimes();
+ // Use an ioConfig whose autoScalerConfig matches the test's intent
(min=15, max=20) so the
+ // supervisor's bounds and the scaler's bounds agree.
+ EasyMock.expect(spec.getIoConfig()).andReturn(new
SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ null,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ misconfiguredAutoScalerConfig,
+ LagAggregator.DEFAULT,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ {
+ }).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -674,33 +709,29 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
- Map<String, Object> modifiedScaleInProps = getScaleInProperties();
-
- modifiedScaleInProps.put("taskCountMax", 20);
- modifiedScaleInProps.put("taskCountMin", 15);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
mapper.convertValue(
- modifiedScaleInProps,
+ misconfiguredProps,
LagBasedAutoScalerConfig.class
),
spec,
emitter
);
- // enable autoscaler so that taskcount config will be ignored and the init
value of taskCount will use taskCountMin.
- Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+ // Initial taskCount comes from the ioConfig's
autoScalerConfig.taskCountMin (15) since
+ // taskCount was passed null in the ioConfig above.
+ Assert.assertEquals(15, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
- // When
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
Thread.sleep(2000);
- // Then
+ // Supervisor caps min/max at partitionCount=10, so the first scale
settles at partitionCount.
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
@@ -715,8 +746,34 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ // taskCountMin=2 so scaler's floored output (1) is below min and triggers
the clamp.
+ final Map<String, Object> scaleInProps = getScaleInProperties();
+ scaleInProps.put("taskCountMin", 2);
+ final SeekableStreamSupervisorIOConfig customIoConfig = new
SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ null,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ mapper.convertValue(scaleInProps, AutoScalerConfig.class),
+ LagAggregator.DEFAULT,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ {
+ };
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(customIoConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -737,19 +794,20 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
@Override
public int getActiveTaskGroupsCount()
{
- return 1;
+ return 2;
}
};
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
mapper.convertValue(
- getScaleInProperties(),
+ scaleInProps,
LagBasedAutoScalerConfig.class
),
spec,
dynamicActionEmitter
);
+ supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
@@ -1180,7 +1238,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -1246,7 +1304,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
// Suspended → DynamicAllocationTasksNotice should return early and not
scale
@@ -1294,7 +1352,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -1452,7 +1510,12 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.replay(ingestionSchema);
}
- private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean
scaleOut)
+ private SeekableStreamSupervisorIOConfig getIOConfig(boolean scaleOut)
+ {
+ return getIOConfig(scaleOut, 2);
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfig(boolean scaleOut, int
maxTaskCount)
{
if (scaleOut) {
return new SeekableStreamSupervisorIOConfig(
@@ -1467,7 +1530,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
new Period("PT30M"),
null,
null,
- mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class),
+ mapper.convertValue(getScaleOutProperties(maxTaskCount),
AutoScalerConfig.class),
LagAggregator.DEFAULT,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index eff5d1acd98..d61049777c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3546,6 +3546,18 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
return state;
}
+
+ /**
+ * The shared record-supplier mock in this test returns a single-partition
stream, which would
+ * otherwise cause the supervisor's partition-count ceiling in
DynamicAllocationTasksNotice to
+ * clamp every cooldown-test scale down to 1. Report a large partition
count so the cooldown
+ * tests can exercise bounds at the autoscaler-config level only.
+ */
+ @Override
+ public int getPartitionCount()
+ {
+ return 1_000;
+ }
}
private class TestEmittingTestSeekableStreamSupervisor extends
BaseTestSeekableStreamSupervisor
@@ -4030,6 +4042,125 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet");
}
+ @Test
+ public void testDynamicAllocationClampsDesiredAboveMaxToMax()
+ {
+ // Scaler returns 20, but taskCountMax=10. Supervisor must clamp and scale
to 10.
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 3, 1, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> 20, () -> {},
scalingEmitter);
+
+ // Task count is clamped to max (10), not the scaler's desired (20).
+ Assert.assertEquals(10, supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals(1, events.size());
+ // The emitted metric value reflects the scaler's unclamped desired (the
operator hint), not
+ // the clamped value the supervisor actually applied.
+ assertScaledToTaskCount(events.get(0), 20);
+ }
+
+ @Test
+ public void testDynamicAllocationClampsDesiredBelowMinToMin()
+ {
+ // Scaler returns 1, but taskCountMin=3. Supervisor must clamp and scale
to 3.
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 5, 3, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {},
scalingEmitter);
+
+ Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals(1, events.size());
+ assertScaledToTaskCount(events.get(0), 1);
+ }
+
+ @Test
+ public void
testDynamicAllocationEmitsAlreadyAtMaxWhenCurrentIsAtMaxAndDesiredAboveMax()
+ {
+ // Current (10) is already at configured max (10). Scaler wants 15 (above
max). Supervisor
+ // clamps to 10 which equals current -> emits "Already at max task count"
skip reason.
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 10, 1, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> 15, () -> {},
scalingEmitter);
+
+ Assert.assertEquals("Task count must not change when at max", 10,
supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals(1, events.size());
+ assertScaleSkipped(events.get(0), 15, "Already at max task count");
+ }
+
+ @Test
+ public void
testDynamicAllocationEmitsAlreadyAtMinWhenCurrentIsAtMinAndDesiredBelowMin()
+ {
+ // Current (3) is already at configured min (3). Scaler wants 1 (below
min). Supervisor clamps
+ // to 3 which equals current -> emits "Already at min task count" skip
reason.
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 3, 3, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {},
scalingEmitter);
+
+ Assert.assertEquals("Task count must not change when at min", 3,
supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals(1, events.size());
+ assertScaleSkipped(events.get(0), 1, "Already at min task count");
+ }
+
+ @Test
+ public void testDynamicAllocationEmitsNothingWhenDesiredEqualsCurrent()
+ {
+ // No skip metric in the steady-state no-op (avoid emitting on every tick).
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {},
scalingEmitter);
+
+ Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertTrue("No metric should be emitted in the steady-state no-op
case", events.isEmpty());
+ }
+
+ @Test
+ public void
testDynamicAllocationEmitsPathologicalSkipReasonWhenScalerReturnsNonPositive()
+ {
+ // Scaler contract: a non-positive return means "I could not compute a
useful answer".
+ // Supervisor must not scale and must emit a skip reason surfacing the
scaler's failure.
+ final StubServiceEmitter scalingEmitter =
+ setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ supervisor.handleDynamicAllocationTasksNotice(() -> -1, () -> {},
scalingEmitter);
+
+ Assert.assertEquals("Task count must not change on pathological return",
5, supervisor.getIoConfig().getTaskCount());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals(1, events.size());
+ assertScaleSkipped(events.get(0), -1, "Auto-scaler failed to compute a
task count");
+ }
+
/**
* Asserts that a required-tasks emission represents an scale event: it
carries the standard
* supervisor/datasource/stream dims, no scalingSkipReason dim, and the
metric value matches the
@@ -4096,10 +4227,23 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
long minScaleDownDelayMillis,
int initialTaskCount
)
+ {
+ return setupSupervisorForAutoScalingTest(minScaleUpDelayMillis,
minScaleDownDelayMillis, initialTaskCount, 1, 100);
+ }
+
+ private StubServiceEmitter setupSupervisorForAutoScalingTest(
+ long minScaleUpDelayMillis,
+ long minScaleDownDelayMillis,
+ int initialTaskCount,
+ int taskCountMin,
+ int taskCountMax
+ )
{
final AutoScalerConfig autoScalerConfig = testAutoScalerConfig(
minScaleUpDelayMillis,
- minScaleDownDelayMillis
+ minScaleDownDelayMillis,
+ taskCountMin,
+ taskCountMax
);
final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(
initialTaskCount,
@@ -4113,6 +4257,16 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
* Returns a minimal test-only {@link AutoScalerConfig}
*/
private static AutoScalerConfig testAutoScalerConfig(long
minScaleUpDelayMillis, long minScaleDownDelayMillis)
+ {
+ return testAutoScalerConfig(minScaleUpDelayMillis,
minScaleDownDelayMillis, 1, 100);
+ }
+
+ private static AutoScalerConfig testAutoScalerConfig(
+ long minScaleUpDelayMillis,
+ long minScaleDownDelayMillis,
+ int taskCountMin,
+ int taskCountMax
+ )
{
return new AutoScalerConfig()
{
@@ -4143,13 +4297,13 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
@Override
public int getTaskCountMax()
{
- return 100;
+ return taskCountMax;
}
@Override
public int getTaskCountMin()
{
- return 1;
+ return taskCountMin;
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 6466b0966e8..129569864f9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -23,13 +23,10 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
@@ -38,7 +35,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CostBasedAutoScalerMockTest
@@ -97,8 +93,10 @@ public class CostBasedAutoScalerMockTest
}
@Test
- public void testNoOpWhenOptimalEqualsCurrent()
+ public void testReturnsOptimalWhenOptimalEqualsCurrent()
{
+ // Scaler contract: return the optimal count the scaler wants, regardless
of current/bounds.
+ // The supervisor handles the "equal to current -> no scale" decision.
CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
int currentTaskCount = 25;
@@ -109,7 +107,7 @@ public class CostBasedAutoScalerMockTest
int result = autoScaler.computeTaskCountForScaleAction();
- Assert.assertEquals("Should return -1 when it equals current (no change
needed)", -1, result);
+ Assert.assertEquals("Scaler should return its optimal count even when it
equals current", optimalCount, result);
}
@Test
@@ -176,8 +174,10 @@ public class CostBasedAutoScalerMockTest
}
@Test
- public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+ public void testReturnsUnclampedOptimalBelowMin()
{
+ // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor
does.
+ // Verify the scaler returns the raw optimal even when it is below
taskCountMin.
CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(50)
@@ -186,25 +186,25 @@ public class CostBasedAutoScalerMockTest
CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
final int configuredTaskCount = 1;
- final int taskCountMin = 50;
+ final int belowMinOptimal = 49; // 1 below taskCountMin; expect unchanged
through scaler
- // Mock computeOptimalTaskCount to return a value different from the
boundary,
- // so the assertion proves the boundary clamping path was taken.
- doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any());
+ doReturn(belowMinOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0,
0.2);
final int result = autoScaler.computeTaskCountForScaleAction();
Assert.assertEquals(
- "Should scale to taskCountMin when the configured task count is below
the minimum boundary",
- taskCountMin,
+ "Scaler should return unclamped optimal; clamping is a supervisor
concern",
+ belowMinOptimal,
result
);
}
@Test
- public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+ public void testReturnsUnclampedOptimalAboveMax()
{
+ // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor
does.
+ // Verify the scaler returns the raw optimal even when it is above
taskCountMax.
CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
.taskCountMax(50)
.taskCountMin(1)
@@ -213,18 +213,16 @@ public class CostBasedAutoScalerMockTest
CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
final int configuredTaskCount = 100;
- final int taskCountMax = 50;
+ final int aboveMaxOptimal = 51; // 1 above taskCountMax; expect unchanged
through scaler
- // Mock computeOptimalTaskCount to return a value different from the
boundary,
- // so the assertion proves the boundary clamping path was taken.
- doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any());
+ doReturn(aboveMaxOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);
final int result = autoScaler.computeTaskCountForScaleAction();
Assert.assertEquals(
- "Should scale to taskCountMax when the configured task count is above
the maximum boundary",
- taskCountMax,
+ "Scaler should return unclamped optimal; clamping is a supervisor
concern",
+ aboveMaxOptimal,
result
);
}
@@ -292,6 +290,9 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled()
{
+ // When scaleDownDuringTaskRolloverOnly is true and the optimal would be a
scale-down, the
+ // scaler's "preferred" count is to stay put — it signals that by
returning the current count.
+ // The supervisor interprets equal-to-current as a steady-state no-op and
skips silently.
CostBasedAutoScalerConfig rolloverOnlyConfig =
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(1)
@@ -314,8 +315,8 @@ public class CostBasedAutoScalerMockTest
setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
Assert.assertEquals(
- "Should return -1 when scaleDownDuringTaskRolloverOnly is true",
- -1,
+ "Should return current count (no-op signal) when
scaleDownDuringTaskRolloverOnly suppresses the scale-down",
+ currentTaskCount,
autoScaler.computeTaskCountForScaleAction()
);
}
@@ -354,88 +355,8 @@ public class CostBasedAutoScalerMockTest
);
}
- @Test
- public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax()
- {
- CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(10)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.build();
- CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
- final int currentTaskCount = 10; // already at max
- doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
- Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
- @SuppressWarnings("unchecked")
- ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- verify(mockEmitter).emit(captor.capture());
- Assert.assertEquals(
- "Should emit 'Already at max task count' skip reason when current task
count is at maximum",
- "Already at max task count",
- ((ServiceMetricEvent.Builder) captor.getValue())
-
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
- );
- }
-
- @Test
- public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin()
- {
- CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(10)
-
.enableTaskAutoScaler(true)
-
.build();
- CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
- final int currentTaskCount = 10; // already at min
- doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
- Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
- @SuppressWarnings("unchecked")
- ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- verify(mockEmitter).emit(captor.capture());
- Assert.assertEquals(
- "Should emit 'Already at min task count' skip reason when current task
count is at minimum",
- "Already at min task count",
- ((ServiceMetricEvent.Builder) captor.getValue())
-
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
- );
- }
-
- @Test
- public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax()
- {
- // When min == max, current is simultaneously at both bounds.
- // The comment in the production code states that signaling max has higher
priority.
- CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(5)
-
.taskCountMin(5)
-
.enableTaskAutoScaler(true)
-
.build();
- CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
- final int currentTaskCount = 5; // at both min and max
- doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
- Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
- @SuppressWarnings("unchecked")
- ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- verify(mockEmitter).emit(captor.capture());
- Assert.assertEquals(
- "Max skip reason should take priority over min skip reason when min
equals max",
- "Already at max task count",
- ((ServiceMetricEvent.Builder) captor.getValue())
-
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
- );
- }
+ // Skip-reason emissions ("Already at max/min task count") moved to
SeekableStreamSupervisor —
+ // see SeekableStreamSupervisorStateTest for those assertions.
private void setupMocksForMetricsCollection(
CostBasedAutoScaler autoScaler,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
index ffbfee77b72..32e2f378fdb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -94,20 +94,28 @@ public class LagBasedAutoScalerTest
}
@Test
- public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+ public void testReturnsUnclampedScaleOutStepEvenWhenCurrentIsBelowMin()
{
+ // Scaler no longer clamps to taskCountMin; it just steps the current
count up by scaleOutStep.
+ // The supervisor is responsible for clamping below-min results up to
taskCountMin.
when(mockIoConfig.getTaskCount()).thenReturn(1);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
- Assert.assertEquals(50,
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+ // current (1) + scaleOutStep (4) = 5 — below configured taskCountMin
(50); not clamped here.
+ Assert.assertEquals(5,
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
}
@Test
- public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+ public void testReturnsUnclampedScaleInStepEvenWhenCurrentIsAboveMax()
{
+ // Scaler no longer clamps to taskCountMax; it just steps the current
count down by scaleInStep.
+ // The supervisor is responsible for clamping above-max results down to
taskCountMax.
when(mockIoConfig.getTaskCount()).thenReturn(101);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+ // current (101) - scaleInStep (1) = 100 — above configured taskCountMax
(100) is still not
+ // clamped here. (In this particular case the result happens to equal
taskCountMax; we just
+ // want the scaler's raw computation.)
Assert.assertEquals(100,
createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
index 17cd347231a..b222c4221dd 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -19,20 +19,34 @@
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+/**
+ * Task-count auto-scaler driven by a streaming supervisor.
+ * <p>
+ * Return-value contract for {@link #computeTaskCountForRollover()} and any
implementation-specific
+ * scale-action method:
+ * <ul>
+ * <li>{@link #CANNOT_COMPUTE} when no preferred count can be computed.</li>
+ * <li>Otherwise, a preferred task count {@code >= 1}, unclamped by min/max
bounds. The supervisor
+ * clamps to {@code [taskCountMin, taskCountMax]} and decides whether to
scale. Scaling to
+ * zero (idle) is the supervisor's responsibility, not the
autoscaler's.</li>
+ * </ul>
+ */
public interface SupervisorTaskAutoScaler
{
+ /** Sentinel for "no preferred task count available". The supervisor will
skip scaling. */
+ int CANNOT_COMPUTE = -1;
+
void start();
void stop();
void reset();
/**
- * Computes the optimal task count during task rollover, allowing a
non-disruptive scale-down.
- * Must be called by the supervisor when tasks are ending their duration.
- *
- * @return optimal task count for scale-down, or -1 if no change needed
+ * Preferred task count when a task group is rolling over, allowing a
non-disruptive scale-down.
+ * Called by the supervisor when tasks are ending their duration. See the
type-level Javadoc for
+ * the return-value contract.
*/
default int computeTaskCountForRollover()
{
- return -1;
+ return CANNOT_COMPUTE;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]