This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2f4726167 refactor: clean up min/max task count limits in streaming 
task autoscaler (#19369)
6e2f4726167 is described below

commit 6e2f4726167a18989833bd62be1f1c9982df2c1c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue May 19 16:34:29 2026 -0700

    refactor: clean up min/max task count limits in streaming task autoscaler 
(#19369)
---
 .../supervisor/SeekableStreamSupervisor.java       |  71 ++++++---
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  97 +++----------
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |  95 ++----------
 .../SeekableStreamSupervisorSpecTest.java          | 109 +++++++++++---
 .../SeekableStreamSupervisorStateTest.java         | 160 ++++++++++++++++++++-
 .../autoscaler/CostBasedAutoScalerMockTest.java    | 129 ++++-------------
 .../autoscaler/LagBasedAutoScalerTest.java         |  14 +-
 .../autoscaler/SupervisorTaskAutoScaler.java       |  24 +++-
 8 files changed, 388 insertions(+), 311 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 91b4244c0bf..9bef543496b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -540,29 +540,65 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           final int desiredTaskCount = computeDesiredTaskCount.call();
           final int currentTaskCount = getCurrentTaskCount();
 
-          if (desiredTaskCount <= 0) {
-            return;
-          }
-
           ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
                                                                
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
                                                                
.setDimension(DruidMetrics.DATASOURCE, dataSource)
                                                                
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
 
-          // 1) This should already be handled by the auto-scaler 
implementation, but make sure we catch/record these for auditability
-          if (desiredTaskCount == currentTaskCount) {
+          if (desiredTaskCount <= 0) {
             log.warn(
-                "Skipping scaling for supervisor[%s] for dataSource[%s]: 
already at desired task count [%d]",
+                "Auto-scaler returned pathological taskCount[%d] for 
supervisor[%s] for dataSource[%s]; skipping scale.",
+                desiredTaskCount,
+                supervisorId,
+                dataSource
+            );
+            emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, 
"Auto-scaler failed to compute a task count")
+                             .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
+            return;
+          }
+
+          final int partitionCount = getPartitionCount();
+          final int rawMin = autoScalerConfig.getTaskCountMin();
+          final int rawMax = autoScalerConfig.getTaskCountMax();
+          final int taskCountMin = partitionCount > 0 ? Math.min(rawMin, 
partitionCount) : rawMin;
+          final int taskCountMax = partitionCount > 0 ? Math.min(rawMax, 
partitionCount) : rawMax;
+          final int clampedTaskCount = Math.min(taskCountMax, 
Math.max(taskCountMin, desiredTaskCount));
+
+          if (clampedTaskCount == currentTaskCount) {
+            // Don't emit on the steady-state no-op.
+            if (desiredTaskCount == currentTaskCount) {
+              log.debug(
+                  "No scale action for supervisor[%s] for dataSource[%s]: 
scaler wants [%d], current [%d].",
+                  supervisorId,
+                  dataSource,
+                  desiredTaskCount,
+                  currentTaskCount
+              );
+              return;
+            }
+
+            final String skipReason;
+            if (desiredTaskCount > taskCountMax) {
+              skipReason = "Already at max task count";
+            } else {
+              skipReason = "Already at min task count";
+            }
+            log.info(
+                "Skipping scaling for supervisor[%s] for dataSource[%s]: [%s] 
(scaler wants [%d], current [%d], bounds [%d,%d])",
                 supervisorId,
                 dataSource,
-                desiredTaskCount
+                skipReason,
+                desiredTaskCount,
+                currentTaskCount,
+                taskCountMin,
+                taskCountMax
             );
-            emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, 
"desired capacity reached")
+            emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, 
skipReason)
                              .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
             return;
           }
 
-          // 2) Make sure we wait for any pending completion tasks to finish.
+          // Make sure we wait for any pending completion tasks to finish.
           // At this point there could be 3 generations of tasks: pending 
completion tasks (old generation), running tasks (current generation), and 
(after our scale) pending tasks (new generation).
           // We want to avoid killing any old generation tasks preemptively, 
as that might cause the current generation tasks' offsets to become invalid.
           for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
@@ -583,7 +619,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             }
           }
 
-          // 3) Make sure we are not breaching any scaling cooldown limits.
+          // Make sure we are not breaching any scaling cooldown limits.
           // Scaling operations are disruptive — scale-down in particular can 
leave the supervisor
           // under-resourced while it recovers from lag induced by the scale 
event, so callers may
           // configure a longer cooldown for scale-down than for scale-up. 
Both directions are measured against the same
@@ -592,23 +628,24 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           final ScaleDirection scaleDirection;
           final long cooldownMillis;
 
-          if (desiredTaskCount > currentTaskCount) {
+          if (clampedTaskCount > currentTaskCount) {
             scaleDirection = ScaleDirection.SCALE_UP;
             cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
-          } else { // desiredTaskCount < currentTaskCount
+          } else { // clampedTaskCount < currentTaskCount
             scaleDirection = ScaleDirection.SCALE_DOWN;
             cooldownMillis = 
autoScalerConfig.getMinScaleDownDelay().getMillis();
           }
 
           if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
             log.info(
-                "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] 
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired 
task count is [%d], current task count is [%d]",
+                "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] 
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! scaler 
wants [%d] (clamped [%d]), current task count is [%d]",
                 nowTime - dynamicTriggerLastScaleRunTime,
                 scaleDirection,
                 cooldownMillis,
                 supervisorId,
                 dataSource,
                 desiredTaskCount,
+                clampedTaskCount,
                 currentTaskCount
             );
 
@@ -620,10 +657,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             return;
           }
 
-          // At this point, we can reasonably attempt a scaling action, so 
emit our required task count
+          // Emit the scaler's unclamped preferred count so operators see what 
it wants.
           emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
 
-          boolean allocationSuccess = changeTaskCount(desiredTaskCount);
+          boolean allocationSuccess = changeTaskCount(clampedTaskCount);
           if (allocationSuccess) {
             onSuccessfulScale.run();
             dynamicTriggerLastScaleRunTime = nowTime;
@@ -672,7 +709,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   {
     final int currentActiveTaskCount = getCurrentTaskCount();
 
-    if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == 
currentActiveTaskCount) {
+    if (desiredActiveTaskCount <= 0 || desiredActiveTaskCount == 
currentActiveTaskCount) {
       return false;
     } else {
       log.info(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9d50266cd73..7dc9dd0c2d6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -160,7 +160,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     if (config.isScaleDownOnTaskRolloverOnly()) {
       return computeOptimalTaskCount(lastKnownMetrics);
     } else {
-      return -1;
+      return CANNOT_COMPUTE;
     }
   }
 
@@ -169,68 +169,24 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     lastKnownMetrics = collectMetrics();
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    int currentTaskCount = supervisor.getIoConfig().getTaskCount();
-
-    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
-    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
-    final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
-                                           || currentTaskCount > 
config.getTaskCountMax();
-    if (isTaskCountOutOfBounds) {
-      currentTaskCount = Math.min(config.getTaskCountMax(), 
Math.max(config.getTaskCountMin(), currentTaskCount));
+    if (optimalTaskCount <= 0) {
+      return CANNOT_COMPUTE;
     }
 
-    // Perform scale-up actions; scale-down actions only if configured.
-    final int taskCount;
+    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
 
-    // If task count is out of bounds, scale to the configured boundary
-    // regardless of optimal task count, to get back to a safe state.
-    if (isTaskCountOutOfBounds) {
-      taskCount = currentTaskCount;
-      log.info(
-          "Task count for supervisor[%s] was out of bounds [%d,%d], urgently 
scaling from [%d] to [%d].",
-          supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), 
currentTaskCount, currentTaskCount
-      );
-    } else if (optimalTaskCount > currentTaskCount) {
-      taskCount = optimalTaskCount;
-      log.info(
-          "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).",
-          supervisorId,
-          currentTaskCount,
-          taskCount
-      );
-    } else if (!config.isScaleDownOnTaskRolloverOnly()
-               && optimalTaskCount < currentTaskCount
-               && optimalTaskCount > 0) {
-      taskCount = optimalTaskCount;
-      log.info(
-          "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).",
-          supervisorId,
-          currentTaskCount,
-          taskCount
-      );
-    } else {
-      taskCount = -1;
-      log.debug("No scaling required for supervisor[%s]", supervisorId);
-
-      // Emit metrics for scaling skip reasons; in case of min == max, 
signaling reaching
-      // max task count has bigger priority for the external observers / 
trackers
-      if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == 
config.getTaskCountMax()) {
-        emitter.emit(getMetricBuilder()
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at max task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
currentTaskCount));
-      } else if (optimalTaskCount == config.getTaskCountMin() || 
currentTaskCount == config.getTaskCountMin()) {
-        emitter.emit(getMetricBuilder()
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at min task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
currentTaskCount));
-      }
+    // Rollover-only scale-down mode: don't proactively scale down here.
+    if (config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount < 
currentTaskCount) {
+      return currentTaskCount;
     }
-    return taskCount;
+
+    log.info(
+        "CostBasedAutoScaler for supervisor[%s] wants taskCount[%d] 
(current[%d]).",
+        supervisorId,
+        optimalTaskCount,
+        currentTaskCount
+    );
+    return optimalTaskCount;
   }
 
   public CostBasedAutoScalerConfig getConfig()
@@ -239,15 +195,9 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   }
 
   /**
-   * Computes the optimal task count based on current metrics.
-   * <p>
-   * Returns -1 (no scaling needed) in the following cases:
-   * <ul>
-   *   <li>Metrics are not available</li>
-   *   <li>Current task count already optimal</li>
-   * </ul>
-   *
-   * @return optimal task count, or -1 if no scaling action is needed
+   * Returns the lowest-cost task count given {@code metrics}, or {@link 
#CANNOT_COMPUTE} when
+   * metrics are unusable. Returning the current task count means the current 
count is already
+   * optimal (or no better candidate could be evaluated).
    */
   int computeOptimalTaskCount(CostMetrics metrics)
   {
@@ -259,13 +209,13 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
               .setDimension(DruidMetrics.DESCRIPTION, result.error())
               .setMetric(INVALID_METRICS_COUNT, 1L)
       );
-      return -1;
+      return CANNOT_COMPUTE;
     }
 
     final int partitionCount = metrics.getPartitionCount();
     final int currentTaskCount = metrics.getCurrentTaskCount();
     if (partitionCount <= 0 || currentTaskCount <= 0) {
-      return -1;
+      return CANNOT_COMPUTE;
     }
 
     final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
@@ -276,8 +226,9 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     );
 
     if (validTaskCounts.length == 0) {
+      // Return current count (not an error) so the supervisor can clamp it 
back into bounds.
       log.warn("No valid task counts after applying constraints for 
supervisor[%s]", supervisorId);
-      return -1;
+      return currentTaskCount;
     }
 
     // Start with the current task count as optimal
@@ -334,9 +285,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     emitter.emit(getMetricBuilder().setMetric(LAG_COST_METRIC, 
optimalCost.lagCost()));
     emitter.emit(getMetricBuilder().setMetric(IDLE_COST_METRIC, 
optimalCost.idleCost()));
 
-    if (optimalTaskCount == currentTaskCount) {
-      return -1;
-    } else {
+    if (optimalTaskCount != currentTaskCount) {
       log.info(
           "Optimal taskCount[%d] for supervisor[%s] has lowest cost[%.4f] out 
of the following candidates: %n%s",
           optimalTaskCount, supervisorId, optimalCost.totalCost(), 
constructCostTable(validTaskCounts, costResults)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index b96a1de7bd4..adb041911af 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -31,8 +31,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.util.ArrayList;
@@ -52,7 +50,6 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final SeekableStreamSupervisor supervisor;
   private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
   private final ServiceEmitter emitter;
-  private final ServiceMetricEvent.Builder metricBuilder;
 
   private static final ReentrantLock LOCK = new ReentrantLock(true);
 
@@ -77,10 +74,6 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.spec = spec;
     this.supervisor = supervisor;
     this.emitter = emitter;
-    metricBuilder = ServiceMetricEvent.builder()
-                                      
.setDimension(DruidMetrics.SUPERVISOR_ID, spec.getId())
-                                      .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
-                                      .setDimension(DruidMetrics.STREAM, 
this.supervisor.getIoConfig().getStream());
   }
 
   @Override
@@ -88,7 +81,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
   {
     Callable<Integer> scaleAction = () -> {
       LOCK.lock();
-      int desiredTaskCount = -1;
+      int desiredTaskCount = CANNOT_COMPUTE;
       try {
         desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
       }
@@ -197,27 +190,9 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     };
   }
 
-  /**
-   * This method determines whether to do scale actions based on collected lag 
points.
-   * The current algorithm of scale is straightforward:
-   * <ul>
-   * <li>First, compute the proportion of lag points higher/lower than {@code 
scaleOutThreshold/scaleInThreshold},
-   * getting {@code scaleInThreshold/scaleOutThreshold},.
-   * <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
-   * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
-   * <ul><li>P.S. Scale out action has a higher priority than scale in 
action.</ul>
-   * <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
-   * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, 
scale out/in action would be triggered.
-   * </ul>
-   *
-   * @param lags the lag metrics of Stream (Kafka/Kinesis)
-   * @return Integer, target number of tasksCount. -1 means skip scale action.
-   */
   @VisibleForTesting
   int computeDesiredTaskCount(List<Long> lags)
   {
-    // if the supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug(
         "Computing the desired task count for supervisor[%s], based on 
following lags : [%s]",
         spec.getId(),
@@ -237,74 +212,30 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     double beyondProportion = beyond * 1.0 / metricsCount;
     double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for supervisor[%s].", beyondProportion,
-        withinProportion, spec.getId()
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
supervisor[%s].",
+        beyondProportion,
+        withinProportion,
+        spec.getId()
     );
 
-    int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
-    int desiredActiveTaskCount;
     final int partitionCount = supervisor.getPartitionCount();
     if (partitionCount <= 0) {
       log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", 
spec.getId());
-      return -1;
+      return CANNOT_COMPUTE;
     }
 
-    final int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
-    final int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
-
-    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
-    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
-    // If that is happening, take the bound and return early.
-    final boolean isTaskCountOutOfBounds = currentActiveTaskCount < 
actualTaskCountMin
-                                           || currentActiveTaskCount > 
actualTaskCountMax;
-    if (isTaskCountOutOfBounds) {
-      currentActiveTaskCount = Math.min(actualTaskCountMax, 
Math.max(actualTaskCountMin, currentActiveTaskCount));
-      return currentActiveTaskCount;
-    }
+    final int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
 
     if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
-      // Do Scale out
-      final int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
-      if (currentActiveTaskCount == actualTaskCountMax) {
-        log.debug(
-            "CurrentActiveTaskCount reached task count Max limit, skipping 
scale out action for supervisor[%s].",
-            spec.getId()
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at max task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
-      }
-      return desiredActiveTaskCount;
+      // scale-out: step up from current, capped by partition count 
(scaler-internal constraint)
+      return Math.min(currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep(), partitionCount);
     }
-
     if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
-      // Do Scale in
-      final int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      if (currentActiveTaskCount == actualTaskCountMin) {
-        log.debug(
-            "CurrentActiveTaskCount reached task count Min limit[%d], skipping 
scale in action for supervisor[%s].",
-            actualTaskCountMin,
-            spec.getId()
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at min task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
-      }
-      return desiredActiveTaskCount;
+      return Math.max(1, currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep());
     }
-    return -1;
+    // Neither trigger fired; the scaler's preferred count is the current 
count.
+    return currentActiveTaskCount;
   }
 
   public LagBasedAutoScalerConfig getAutoScalerConfig()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 3d0c6426feb..8d1b5350e8c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -400,8 +400,11 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
+    // Use taskCountMax=10 so both the scaler and the supervisor's bounds 
agree and multiple
+    // scale-out iterations have headroom before hitting max (which is what 
lets us observe the
+    // "Scale cooldown not elapsed yet" skip reason emitted by the supervisor 
on later iterations).
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true, 
10)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -457,7 +460,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -510,7 +513,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -559,7 +562,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -606,7 +609,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
false)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(false)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -655,10 +658,42 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
   @Test
   public void 
testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() 
throws InterruptedException
   {
+    // Verifies that when the operator misconfigures taskCountMin above 
partitionCount, the
+    // supervisor's partition-count ceiling brings both bounds down to 
partitionCount and any
+    // scale action settles at partitionCount.
+    final Map<String, Object> misconfiguredProps = getScaleInProperties();
+    misconfiguredProps.put("taskCountMax", 20);
+    misconfiguredProps.put("taskCountMin", 15);
+    final AutoScalerConfig misconfiguredAutoScalerConfig =
+        mapper.convertValue(misconfiguredProps, AutoScalerConfig.class);
+
     EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
false)).anyTimes();
+    // Use an ioConfig whose autoScalerConfig matches the test's intent 
(min=15, max=20) so the
+    // supervisor's bounds and the scaler's bounds agree.
+    EasyMock.expect(spec.getIoConfig()).andReturn(new 
SeekableStreamSupervisorIOConfig(
+        "stream",
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        null,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        misconfiguredAutoScalerConfig,
+        LagAggregator.DEFAULT,
+        null,
+        null,
+        null,
+        null,
+        null
+    )
+    {
+    }).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -674,33 +709,29 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.replay(taskMaster);
 
     TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
-    Map<String, Object> modifiedScaleInProps = getScaleInProperties();
-
-    modifiedScaleInProps.put("taskCountMax", 20);
-    modifiedScaleInProps.put("taskCountMin", 15);
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
         mapper.convertValue(
-            modifiedScaleInProps,
+            misconfiguredProps,
             LagBasedAutoScalerConfig.class
         ),
         spec,
         emitter
     );
 
-    // enable autoscaler so that taskcount config will be ignored and the init 
value of taskCount will use taskCountMin.
-    Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+    // Initial taskCount comes from the ioConfig's 
autoScalerConfig.taskCountMin (15) since
+    // taskCount was passed null in the ioConfig above.
+    Assert.assertEquals(15, (int) supervisor.getIoConfig().getTaskCount());
     supervisor.getIoConfig().setTaskCount(2);
 
-    // When
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
 
     Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
     Thread.sleep(2000);
-    // Then
+    // Supervisor caps min/max at partitionCount=10, so the first scale 
settles at partitionCount.
     Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
 
     
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
@@ -715,8 +746,34 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
+    // taskCountMin=2 so scaler's floored output (1) is below min and triggers 
the clamp.
+    final Map<String, Object> scaleInProps = getScaleInProperties();
+    scaleInProps.put("taskCountMin", 2);
+    final SeekableStreamSupervisorIOConfig customIoConfig = new 
SeekableStreamSupervisorIOConfig(
+        "stream",
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        null,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        mapper.convertValue(scaleInProps, AutoScalerConfig.class),
+        LagAggregator.DEFAULT,
+        null,
+        null,
+        null,
+        null,
+        null
+    )
+    {
+    };
+
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(customIoConfig).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -737,19 +794,20 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
       @Override
       public int getActiveTaskGroupsCount()
       {
-        return 1;
+        return 2;
       }
     };
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
         mapper.convertValue(
-            getScaleInProperties(),
+            scaleInProps,
             LagBasedAutoScalerConfig.class
         ),
         spec,
         dynamicActionEmitter
     );
+    supervisor.getIoConfig().setTaskCount(2);
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
@@ -1180,7 +1238,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -1246,7 +1304,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     // Suspended → DynamicAllocationTasksNotice should return early and not 
scale
@@ -1294,7 +1352,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(true)).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
@@ -1452,7 +1510,12 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.replay(ingestionSchema);
   }
 
-  private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean 
scaleOut)
+  private SeekableStreamSupervisorIOConfig getIOConfig(boolean scaleOut)
+  {
+    return getIOConfig(scaleOut, 2);
+  }
+
+  private SeekableStreamSupervisorIOConfig getIOConfig(boolean scaleOut, int 
maxTaskCount)
   {
     if (scaleOut) {
       return new SeekableStreamSupervisorIOConfig(
@@ -1467,7 +1530,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
           new Period("PT30M"),
           null,
           null,
-          mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class),
+          mapper.convertValue(getScaleOutProperties(maxTaskCount), 
AutoScalerConfig.class),
           LagAggregator.DEFAULT,
           null,
           null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index eff5d1acd98..d61049777c8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3546,6 +3546,18 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     {
       return state;
     }
+
+    /**
+     * The shared record-supplier mock in this test returns a single-partition 
stream, which would
+     * otherwise cause the supervisor's partition-count ceiling in 
DynamicAllocationTasksNotice to
+     * clamp every cooldown-test scale down to 1. Report a large partition 
count so the cooldown
+     * tests can exercise bounds at the autoscaler-config level only.
+     */
+    @Override
+    public int getPartitionCount()
+    {
+      return 1_000;
+    }
   }
 
   private class TestEmittingTestSeekableStreamSupervisor extends 
BaseTestSeekableStreamSupervisor
@@ -4030,6 +4042,125 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet");
   }
 
+  @Test
+  public void testDynamicAllocationClampsDesiredAboveMaxToMax()
+  {
+    // Scaler returns 20, but taskCountMax=10. Supervisor must clamp and scale 
to 10.
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 3, 1, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> 20, () -> {}, 
scalingEmitter);
+
+    // Task count is clamped to max (10), not the scaler's desired (20).
+    Assert.assertEquals(10, supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals(1, events.size());
+    // The emitted metric value reflects the scaler's unclamped desired (the 
operator hint), not
+    // the clamped value the supervisor actually applied.
+    assertScaledToTaskCount(events.get(0), 20);
+  }
+
+  @Test
+  public void testDynamicAllocationClampsDesiredBelowMinToMin()
+  {
+    // Scaler returns 1, but taskCountMin=3. Supervisor must clamp and scale 
to 3.
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 5, 3, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals(1, events.size());
+    assertScaledToTaskCount(events.get(0), 1);
+  }
+
+  @Test
+  public void 
testDynamicAllocationEmitsAlreadyAtMaxWhenCurrentIsAtMaxAndDesiredAboveMax()
+  {
+    // Current (10) is already at configured max (10). Scaler wants 15 (above 
max). Supervisor
+    // clamps to 10 which equals current -> emits "Already at max task count" 
skip reason.
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 10, 1, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> 15, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals("Task count must not change when at max", 10, 
supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals(1, events.size());
+    assertScaleSkipped(events.get(0), 15, "Already at max task count");
+  }
+
+  @Test
+  public void 
testDynamicAllocationEmitsAlreadyAtMinWhenCurrentIsAtMinAndDesiredBelowMin()
+  {
+    // Current (3) is already at configured min (3). Scaler wants 1 (below 
min). Supervisor clamps
+    // to 3 which equals current -> emits "Already at min task count" skip 
reason.
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 3, 3, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals("Task count must not change when at min", 3, 
supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals(1, events.size());
+    assertScaleSkipped(events.get(0), 1, "Already at min task count");
+  }
+
+  @Test
+  public void testDynamicAllocationEmitsNothingWhenDesiredEqualsCurrent()
+  {
+    // No skip metric in the steady-state no-op (avoid emitting on every tick).
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertTrue("No metric should be emitted in the steady-state no-op 
case", events.isEmpty());
+  }
+
+  @Test
+  public void 
testDynamicAllocationEmitsPathologicalSkipReasonWhenScalerReturnsNonPositive()
+  {
+    // Scaler contract: a non-positive return means "I could not compute a 
useful answer".
+    // Supervisor must not scale and must emit a skip reason surfacing the 
scaler's failure.
+    final StubServiceEmitter scalingEmitter =
+        setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    supervisor.handleDynamicAllocationTasksNotice(() -> -1, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals("Task count must not change on pathological return", 
5, supervisor.getIoConfig().getTaskCount());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals(1, events.size());
+    assertScaleSkipped(events.get(0), -1, "Auto-scaler failed to compute a 
task count");
+  }
+
   /**
    * Asserts that a required-tasks emission represents an scale event: it 
carries the standard
    * supervisor/datasource/stream dims, no scalingSkipReason dim, and the 
metric value matches the
@@ -4096,10 +4227,23 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
       long minScaleDownDelayMillis,
       int initialTaskCount
   )
+  {
+    return setupSupervisorForAutoScalingTest(minScaleUpDelayMillis, 
minScaleDownDelayMillis, initialTaskCount, 1, 100);
+  }
+
+  private StubServiceEmitter setupSupervisorForAutoScalingTest(
+      long minScaleUpDelayMillis,
+      long minScaleDownDelayMillis,
+      int initialTaskCount,
+      int taskCountMin,
+      int taskCountMax
+  )
   {
     final AutoScalerConfig autoScalerConfig = testAutoScalerConfig(
         minScaleUpDelayMillis,
-        minScaleDownDelayMillis
+        minScaleDownDelayMillis,
+        taskCountMin,
+        taskCountMax
     );
     final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(
         initialTaskCount,
@@ -4113,6 +4257,16 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
    * Returns a minimal test-only {@link AutoScalerConfig}
    */
   private static AutoScalerConfig testAutoScalerConfig(long 
minScaleUpDelayMillis, long minScaleDownDelayMillis)
+  {
+    return testAutoScalerConfig(minScaleUpDelayMillis, 
minScaleDownDelayMillis, 1, 100);
+  }
+
+  private static AutoScalerConfig testAutoScalerConfig(
+      long minScaleUpDelayMillis,
+      long minScaleDownDelayMillis,
+      int taskCountMin,
+      int taskCountMax
+  )
   {
     return new AutoScalerConfig()
     {
@@ -4143,13 +4297,13 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
       @Override
       public int getTaskCountMax()
       {
-        return 100;
+        return taskCountMax;
       }
 
       @Override
       public int getTaskCountMin()
       {
-        return 1;
+        return taskCountMin;
       }
 
       @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 6466b0966e8..129569864f9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -23,13 +23,10 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.List;
@@ -38,7 +35,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CostBasedAutoScalerMockTest
@@ -97,8 +93,10 @@ public class CostBasedAutoScalerMockTest
   }
 
   @Test
-  public void testNoOpWhenOptimalEqualsCurrent()
+  public void testReturnsOptimalWhenOptimalEqualsCurrent()
   {
+    // Scaler contract: return the optimal count the scaler wants, regardless 
of current/bounds.
+    // The supervisor handles the "equal to current -> no scale" decision.
     CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
 
     int currentTaskCount = 25;
@@ -109,7 +107,7 @@ public class CostBasedAutoScalerMockTest
 
     int result = autoScaler.computeTaskCountForScaleAction();
 
-    Assert.assertEquals("Should return -1 when it equals current (no change 
needed)", -1, result);
+    Assert.assertEquals("Scaler should return its optimal count even when it 
equals current", optimalCount, result);
   }
 
   @Test
@@ -176,8 +174,10 @@ public class CostBasedAutoScalerMockTest
   }
 
   @Test
-  public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+  public void testReturnsUnclampedOptimalBelowMin()
   {
+    // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor 
does.
+    // Verify the scaler returns the raw optimal even when it is below 
taskCountMin.
     CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
                                                                        
.taskCountMax(100)
                                                                        
.taskCountMin(50)
@@ -186,25 +186,25 @@ public class CostBasedAutoScalerMockTest
     CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
 
     final int configuredTaskCount = 1;
-    final int taskCountMin = 50;
+    final int belowMinOptimal = 49; // 1 below taskCountMin; expect unchanged 
through scaler
 
-    // Mock computeOptimalTaskCount to return a value different from the 
boundary,
-    // so the assertion proves the boundary clamping path was taken.
-    doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any());
+    doReturn(belowMinOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 
0.2);
 
     final int result = autoScaler.computeTaskCountForScaleAction();
 
     Assert.assertEquals(
-        "Should scale to taskCountMin when the configured task count is below 
the minimum boundary",
-        taskCountMin,
+        "Scaler should return unclamped optimal; clamping is a supervisor 
concern",
+        belowMinOptimal,
         result
     );
   }
 
   @Test
-  public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+  public void testReturnsUnclampedOptimalAboveMax()
   {
+    // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor 
does.
+    // Verify the scaler returns the raw optimal even when it is above 
taskCountMax.
     CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
                                                                        
.taskCountMax(50)
                                                                        
.taskCountMin(1)
@@ -213,18 +213,16 @@ public class CostBasedAutoScalerMockTest
     CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
 
     final int configuredTaskCount = 100;
-    final int taskCountMax = 50;
+    final int aboveMaxOptimal = 51; // 1 above taskCountMax; expect unchanged 
through scaler
 
-    // Mock computeOptimalTaskCount to return a value different from the 
boundary,
-    // so the assertion proves the boundary clamping path was taken.
-    doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any());
+    doReturn(aboveMaxOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);
 
     final int result = autoScaler.computeTaskCountForScaleAction();
 
     Assert.assertEquals(
-        "Should scale to taskCountMax when the configured task count is above 
the maximum boundary",
-        taskCountMax,
+        "Scaler should return unclamped optimal; clamping is a supervisor 
concern",
+        aboveMaxOptimal,
         result
     );
   }
@@ -292,6 +290,9 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled()
   {
+    // When scaleDownDuringTaskRolloverOnly is true and the optimal would be a 
scale-down, the
+    // scaler's "preferred" count is to stay put — it signals that by 
returning the current count.
+    // The supervisor interprets equal-to-current as a steady-state no-op and 
skips silently.
     CostBasedAutoScalerConfig rolloverOnlyConfig = 
CostBasedAutoScalerConfig.builder()
                                                                             
.taskCountMax(100)
                                                                             
.taskCountMin(1)
@@ -314,8 +315,8 @@ public class CostBasedAutoScalerMockTest
     setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
 
     Assert.assertEquals(
-        "Should return -1 when scaleDownDuringTaskRolloverOnly is true",
-        -1,
+        "Should return current count (no-op signal) when 
scaleDownDuringTaskRolloverOnly suppresses the scale-down",
+        currentTaskCount,
         autoScaler.computeTaskCountForScaleAction()
     );
   }
@@ -354,88 +355,8 @@ public class CostBasedAutoScalerMockTest
     );
   }
 
-  @Test
-  public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax()
-  {
-    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(10)
-                                                                       
.taskCountMin(1)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.build();
-    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
-    final int currentTaskCount = 10; // already at max
-    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
-    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
-    @SuppressWarnings("unchecked")
-    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    verify(mockEmitter).emit(captor.capture());
-    Assert.assertEquals(
-        "Should emit 'Already at max task count' skip reason when current task 
count is at maximum",
-        "Already at max task count",
-        ((ServiceMetricEvent.Builder) captor.getValue())
-            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
-    );
-  }
-
-  @Test
-  public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin()
-  {
-    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(100)
-                                                                       
.taskCountMin(10)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.build();
-    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
-    final int currentTaskCount = 10; // already at min
-    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
-    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
-    @SuppressWarnings("unchecked")
-    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    verify(mockEmitter).emit(captor.capture());
-    Assert.assertEquals(
-        "Should emit 'Already at min task count' skip reason when current task 
count is at minimum",
-        "Already at min task count",
-        ((ServiceMetricEvent.Builder) captor.getValue())
-            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
-    );
-  }
-
-  @Test
-  public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax()
-  {
-    // When min == max, current is simultaneously at both bounds.
-    // The comment in the production code states that signaling max has higher 
priority.
-    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(5)
-                                                                       
.taskCountMin(5)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.build();
-    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
-
-    final int currentTaskCount = 5; // at both min and max
-    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
-
-    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
-
-    @SuppressWarnings("unchecked")
-    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    verify(mockEmitter).emit(captor.capture());
-    Assert.assertEquals(
-        "Max skip reason should take priority over min skip reason when min 
equals max",
-        "Already at max task count",
-        ((ServiceMetricEvent.Builder) captor.getValue())
-            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
-    );
-  }
+  // Skip-reason emissions ("Already at max/min task count") moved to 
SeekableStreamSupervisor —
+  // see SeekableStreamSupervisorStateTest for those assertions.
 
   private void setupMocksForMetricsCollection(
       CostBasedAutoScaler autoScaler,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
index ffbfee77b72..32e2f378fdb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -94,20 +94,28 @@ public class LagBasedAutoScalerTest
   }
 
   @Test
-  public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+  public void testReturnsUnclampedScaleOutStepEvenWhenCurrentIsBelowMin()
   {
+    // Scaler no longer clamps to taskCountMin; it just steps the current 
count up by scaleOutStep.
+    // The supervisor is responsible for clamping below-min results up to 
taskCountMin.
     when(mockIoConfig.getTaskCount()).thenReturn(1);
     when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
 
-    Assert.assertEquals(50, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+    // current (1) + scaleOutStep (4) = 5 — below configured taskCountMin 
(50); not clamped here.
+    Assert.assertEquals(5, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
   }
 
   @Test
-  public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+  public void testReturnsUnclampedScaleInStepEvenWhenCurrentIsAboveMax()
   {
+    // Scaler no longer clamps to taskCountMax; it just steps the current 
count down by scaleInStep.
+    // The supervisor is responsible for clamping above-max results down to 
taskCountMax.
     when(mockIoConfig.getTaskCount()).thenReturn(101);
     when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
 
+    // current (101) - scaleInStep (1) = 100 — above configured taskCountMax 
(100) is still not
+    // clamped here. (In this particular case the result happens to equal 
taskCountMax; we just
+    // want the scaler's raw computation.)
     Assert.assertEquals(100, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
index 17cd347231a..b222c4221dd 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -19,20 +19,34 @@
 
 package org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
+/**
+ * Task-count auto-scaler driven by a streaming supervisor.
+ * <p>
+ * Return-value contract for {@link #computeTaskCountForRollover()} and any 
implementation-specific
+ * scale-action method:
+ * <ul>
+ *   <li>{@link #CANNOT_COMPUTE} when no preferred count can be computed.</li>
+ *   <li>Otherwise, a preferred task count {@code >= 1}, unclamped by min/max 
bounds. The supervisor
+ *       clamps to {@code [taskCountMin, taskCountMax]} and decides whether to 
scale. Scaling to
+ *       zero (idle) is the supervisor's responsibility, not the 
autoscaler's.</li>
+ * </ul>
+ */
 public interface SupervisorTaskAutoScaler
 {
+  /** Sentinel for "no preferred task count available". The supervisor will 
skip scaling. */
+  int CANNOT_COMPUTE = -1;
+
   void start();
   void stop();
   void reset();
 
   /**
-   * Computes the optimal task count during task rollover, allowing a 
non-disruptive scale-down.
-   * Must be called by the supervisor when tasks are ending their duration.
-   *
-   * @return optimal task count for scale-down, or -1 if no change needed
+   * Preferred task count when a task group is rolling over, allowing a 
non-disruptive scale-down.
+   * Called by the supervisor when tasks are ending their duration. See the 
type-level Javadoc for
+   * the return-value contract.
    */
   default int computeTaskCountForRollover()
   {
-    return -1;
+    return CANNOT_COMPUTE;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to