This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 67abdc2012f Add optional plugins to basic cost function in 
CostBasedAutoScaler (#18976)
67abdc2012f is described below

commit 67abdc2012f51b31f288aa7feb5c71baf7cb9a4c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Thu Feb 5 18:04:23 2026 +0200

    Add optional plugins to basic cost function in CostBasedAutoScaler (#18976)
    
    Changes:
    - separate the logic of pure cost function, making all additional logic 
opt-in in config;
    - `scaleDownBarrier` has been changed to `minScaleDownDelay`, which is now 
`Duration`;
    - changes to high lag fast scaleup: logarithmic scaling formula for idle 
decay on high lag and task boundaries.
    
    Details:
    This change replaces the sqrt-based scaling formula with a logarithmic 
formula that provides
    more aggressive emergency recovery at low task counts and millions of lag.
    
    Idle decay: ` ln(lagSeverity) / ln(maxSeverity)`. Less aggressive, scales 
well with lag growth.
    
    Formula `K = P/(6.4*sqrt(C))` means small task counts get massive K values 
(emergency recovery),
    while large task counts get smaller K values (stability).
---
 docs/ingestion/supervisor.md                       |  12 +-
 .../CostBasedAutoScalerIntegrationTest.java        |  13 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java | 164 ++++++---
 .../autoscaler/CostBasedAutoScalerConfig.java      |  98 +++--
 .../autoscaler/WeightedCostFunction.java           |  67 ++--
 .../autoscaler/CostBasedAutoScalerConfigTest.java  |  48 ++-
 .../CostBasedAutoScalerHighLagScalingTest.java     | 154 ++++++++
 .../autoscaler/CostBasedAutoScalerMockTest.java    |  66 ++--
 .../autoscaler/CostBasedAutoScalerTest.java        | 410 ++++++++++++---------
 .../autoscaler/WeightedCostFunctionTest.java       |  88 ++++-
 10 files changed, 733 insertions(+), 387 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 00256e4ea9c..95991d4d5cf 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -203,13 +203,15 @@ Note: Kinesis is not supported yet, support is in 
progress.
 The following table outlines the configuration properties related to the 
`costBased` autoscaler strategy:
 
 | Property|Description|Required|Default|
-|---------|---------------------------------------------------|---|-----|
-|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered. | No | 60000 |
-|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25|
+|---------|-----------|--------|-------|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered. | No | 600000 |
+|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
 |`idleWeight`|The weight of extracted poll idle value in cost function. | No | 
0.75 |
 |`defaultProcessingRate`|A planned processing rate per task, required for 
first cost estimations. | No | 1000 |
-|`scaleDownBarrier`| A number of successful scale down attempts which should 
be skipped  to prevent the auto-scaler from scaling down tasks immediately.  | 
No | 5 |
-|`scaleDownDuringTaskRolloverOnly`| Indicates whether task scaling down is 
limited to periods during task rollovers only. | No | False |
+|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when 
selecting task counts.|No| `false` |
+|`highLagThreshold`|Per-partition lag threshold that triggers burst scale-up 
when set to a value greater than `0`. Set to a negative value to disable burst 
scale-up.|No|-1|
+|`minScaleDownDelay`|Minimum duration between successful scale actions, 
specified as an ISO-8601 duration string.|No|`PT30M`|
+|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is 
limited to periods during task rollovers only.|No|`false`|
 
 The following example shows a supervisor spec with `lagBased` autoscaler:
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index ad23e0b6fbe..1138aacf6cf 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -127,13 +128,13 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .taskCountMin(1)
         .taskCountMax(100)
         .taskCountStart(initialTaskCount)
-        .scaleActionPeriodMillis(1500)
-        .minTriggerScaleActionFrequencyMillis(3000)
+        .scaleActionPeriodMillis(1900)
+        .minTriggerScaleActionFrequencyMillis(2000)
         // Weight configuration: strongly favor lag reduction over idle time
         .lagWeight(0.9)
         .idleWeight(0.1)
         .scaleDownDuringTaskRolloverOnly(false)
-        .scaleDownBarrier(1)
+        .minScaleDownDelay(Duration.ZERO)
         .build();
 
     final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
@@ -147,10 +148,10 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
                                         .hasDimension(DruidMetrics.DATASOURCE, 
dataSource));
 
     // Wait for autoscaler to emit optimalTaskCount metric indicating 
scale-down
-    // We expect the optimal task count to 4
+    // We expect the optimal task count less than 6
     overlord.latchableEmitter().waitForEvent(
         event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
-                      .hasValueMatching(Matchers.equalTo(6L))
+                      .hasValueMatching(Matchers.lessThanOrEqualTo(6L))
     );
 
     // Suspend the supervisor
@@ -229,7 +230,7 @@ public class CostBasedAutoScalerIntegrationTest extends 
EmbeddedClusterTestBase
         .idleWeight(0.9)
         .scaleDownDuringTaskRolloverOnly(true)
         // Do not slow scale-downs
-        .scaleDownBarrier(0)
+        .minScaleDownDelay(Duration.ZERO)
         .build();
 
     final KafkaSupervisorSpec spec = 
createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, 
initialTaskCount);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 060240014be..8bf2e8ab787 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -27,6 +27,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -55,31 +56,20 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 {
   private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
 
-  private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
-  private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
-  /**
-   * Defines the step size used for evaluating lag when computing scaling 
actions.
-   * This constant helps control the granularity of lag considerations in 
scaling decisions,
-   * ensuring smoother transitions between scaled states and avoiding abrupt 
changes in task counts.
-   */
-  private static final int LAG_STEP = 50_000;
-  /**
-   * This parameter fine-tunes autoscaling behavior by adding extra flexibility
-   * when calculating maximum allowable partitions per task in response to lag,
-   * which must be processed as fast, as possible.
-   * It acts as a foundational factor that balances the responsiveness and 
stability of autoscaling.
-   */
-  private static final int BASE_RAW_EXTRA = 5;
-  // Base PPT lag threshold allowing to activate a burst scaleup to eliminate 
high lag.
-  static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 25_000;
-  // Extra PPT lag threshold allowing activation of even more aggressive 
scaleup to eliminate high lag,
-  // also enabling lag-amplified idle calculation decay in the cost function 
(to reduce idle weight).
-  static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
-
   public static final String LAG_COST_METRIC = 
"task/autoScaler/costBased/lagCost";
   public static final String IDLE_COST_METRIC = 
"task/autoScaler/costBased/idleCost";
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
 
+  static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
+  static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+
+  /**
+   * Divisor for partition count in the K formula: K = (partitionCount / 
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
+   * This controls how aggressive the scaling is relative to partition count.
+   * That value was chosen by carefully analyzing the math model behind the 
implementation.
+   */
+  static final double K_PARTITION_DIVISOR = 6.4;
+
   private final String supervisorId;
   private final SeekableStreamSupervisor supervisor;
   private final ServiceEmitter emitter;
@@ -90,7 +80,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final WeightedCostFunction costFunction;
   private volatile CostMetrics lastKnownMetrics;
 
-  private int scaleDownCounter = 0;
+  private volatile long lastScaleActionTimeMillis = -1;
 
   public CostBasedAutoScaler(
       SeekableStreamSupervisor supervisor,
@@ -106,7 +96,6 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     this.emitter = emitter;
 
     this.costFunction = new WeightedCostFunction();
-
     this.autoscalerExecutor = 
Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
                                                             + 
StringUtils.encodeForFormat(spec.getId()));
     this.metricBuilder = ServiceMetricEvent.builder()
@@ -117,6 +106,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
                                            );
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void start()
   {
@@ -153,7 +143,6 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     if (config.isScaleDownOnTaskRolloverOnly()) {
       return computeOptimalTaskCount(lastKnownMetrics);
     } else {
-      scaleDownCounter = 0;
       return -1;
     }
   }
@@ -166,16 +155,16 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     // Perform scale-up actions; scale-down actions only if configured.
     int taskCount = -1;
-    if (optimalTaskCount > currentTaskCount) {
+    if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
-      scaleDownCounter = 0; // Nullify the scaleDown counter after a 
successful scaleup too.
+      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("New task count [%d] on supervisor [%s], scaling up", 
taskCount, supervisorId);
     } else if (!config.isScaleDownOnTaskRolloverOnly()
+               && isScaleActionAllowed()
                && optimalTaskCount < currentTaskCount
-               && optimalTaskCount > 0
-               && ++scaleDownCounter >= config.getScaleDownBarrier()) {
+               && optimalTaskCount > 0) {
       taskCount = optimalTaskCount;
-      scaleDownCounter = 0;
+      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("New task count [%d] on supervisor [%s], scaling down", 
taskCount, supervisorId);
     } else {
       log.info("No scaling required for supervisor [%s]", supervisorId);
@@ -217,7 +206,9 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
         currentTaskCount,
         (long) metrics.getAggregateLag(),
         config.getTaskCountMin(),
-        config.getTaskCountMax()
+        config.getTaskCountMax(),
+        config.shouldUseTaskCountBoundaries(),
+        config.getHighLagThreshold()
     );
 
     if (validTaskCounts.length == 0) {
@@ -228,11 +219,20 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     int optimalTaskCount = -1;
     CostResult optimalCost = new CostResult();
 
+    log.info(
+        "Current metrics: avgPartitionLag[%.1f], pollIdleRatio[%.1f], 
lagWeight[%.1f], idleWeight[%.1f]",
+        metrics.getAggregateLag(),
+        metrics.getPollIdleRatio(),
+        config.getLagWeight(),
+        config.getIdleWeight()
+    );
+
     for (int taskCount : validTaskCounts) {
       CostResult costResult = costFunction.computeCost(metrics, taskCount, 
config);
       double cost = costResult.totalCost();
-      log.debug(
-          "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+
+      log.info(
+          "Proposed task count[%d] has total cost[%.4f] = lagCost[%.4f] + 
idleCost[%.4f].",
           taskCount,
           cost,
           costResult.lagCost(),
@@ -268,18 +268,19 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   }
 
   /**
-   * Generates valid task counts based on partitions-per-task ratios and 
lag-driven PPT relaxation.
-   * This enables gradual scaling and avoids large jumps.
-   * Limits the range of task counts considered to avoid excessive computation.
+   * Generates valid task counts based on partitions-per-task ratios.
    *
    * @return sorted list of valid task counts within bounds
    */
+  @SuppressWarnings({"ReassignedVariable"})
   static int[] computeValidTaskCounts(
       int partitionCount,
       int currentTaskCount,
       double aggregateLag,
       int taskCountMin,
-      int taskCountMax
+      int taskCountMax,
+      boolean isTaskCountBoundariesEnabled,
+      int highLagThreshold
   )
   {
     if (partitionCount <= 0 || currentTaskCount <= 0) {
@@ -288,22 +289,33 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     IntSet result = new IntArraySet();
     final int currentPartitionsPerTask = partitionCount / currentTaskCount;
-    final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
-        aggregateLag,
-        partitionCount,
-        currentTaskCount,
-        taskCountMax
-    );
-    final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
 
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
-    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
effectiveMaxIncrease);
-    final int maxPartitionsPerTask = Math.min(
-        partitionCount,
-        currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
-    );
+    int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax);
+    int maxPartitionsPerTask = Math.max(partitionCount, partitionCount / 
taskCountMin);
 
-    for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
+    if (isTaskCountBoundariesEnabled) {
+      maxPartitionsPerTask = Math.min(
+          partitionCount,
+          currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
+      );
+
+      int extraIncrease = 0;
+      if (highLagThreshold > 0) {
+        extraIncrease = computeExtraPPTIncrease(
+            highLagThreshold,
+            aggregateLag,
+            partitionCount,
+            currentTaskCount,
+            taskCountMax
+        );
+      }
+      int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
+      minPartitionsPerTask = Math.max(minPartitionsPerTask, 
currentPartitionsPerTask - effectiveMaxIncrease);
+    }
+
+    for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask
+                                                       && partitionsPerTask != 
0; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
       if (taskCount >= taskCountMin && taskCount <= taskCountMax) {
         result.add(taskCount);
@@ -314,32 +326,46 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   /**
    * Computes extra allowed increase in partitions-per-task in scenarios when 
the average per-partition lag
-   * is above the configured threshold. By default, it is {@code 
EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
-   * Generally, one of the autoscaler priorities is to keep the lag as close 
to zero as possible.
+   * is above the configured threshold.
+   * <p>
+   * This uses a logarithmic formula for consistent absolute growth:
+   * {@code deltaTasks = K * ln(lagSeverity)}
+   * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}
+   * <p>
+   * This ensures that small taskCount's get a massive relative boost,
+   * while large taskCount's receive more measured, stable increases.
    */
-  static int computeExtraMaxPartitionsPerTaskIncrease(
+  static int computeExtraPPTIncrease(
+      double lagThreshold,
       double aggregateLag,
       int partitionCount,
       int currentTaskCount,
       int taskCountMax
   )
   {
-    if (partitionCount <= 0 || taskCountMax <= 0) {
+    if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) {
       return 0;
     }
 
     final double lagPerPartition = aggregateLag / partitionCount;
-    if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+    if (lagPerPartition < lagThreshold) {
       return 0;
     }
 
-    int rawExtra = BASE_RAW_EXTRA;
-    if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) {
-      rawExtra += (int) ((lagPerPartition - 
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP);
-    }
+    final double lagSeverity = lagPerPartition / lagThreshold;
+
+    // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1
+    // First multoplier decreases with sqrt(currentTaskCount): aggressive when 
small, conservative when large
+    final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) / 
Math.sqrt(currentTaskCount) * Math.log(
+        lagSeverity);
+
+    final double targetTaskCount = Math.min(taskCountMax, (double) 
currentTaskCount + deltaTasks);
 
-    final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount 
/ taskCountMax);
-    return (int) (rawExtra * headroomRatio);
+    // Compute precise PPT reduction to avoid early integer truncation 
artifacts
+    final double currentPPT = (double) partitionCount / currentTaskCount;
+    final double targetPPT = (double) partitionCount / targetTaskCount;
+
+    return Math.max(0, (int) Math.floor(currentPPT - targetPPT));
   }
 
   /**
@@ -464,4 +490,22 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     );
   }
 
+  /**
+   * Determines if a scale action is currently allowed based on the elapsed 
time
+   * since the last scale action and the configured minimum scale-down delay.
+   */
+  private boolean isScaleActionAllowed()
+  {
+    if (lastScaleActionTimeMillis < 0) {
+      return true;
+    }
+
+    final long barrierMillis = config.getMinScaleDownDelay().getMillis();
+    if (barrierMillis <= 0) {
+      return true;
+    }
+
+    final long elapsedMillis = DateTimes.nowUtc().getMillis() - 
lastScaleActionTimeMillis;
+    return elapsedMillis >= barrierMillis;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index d835fea5157..cb791abefb3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -48,12 +49,12 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   static final double DEFAULT_LAG_WEIGHT = 0.25;
   static final double DEFAULT_IDLE_WEIGHT = 0.75;
   static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec 
per task as default
-  static final int DEFAULT_SCALE_DOWN_BARRIER = 5; // We delay scale down by 5 
* DEFAULT_SCALE_ACTION_PERIOD_MILLIS
+  static final Duration DEFAULT_MIN_SCALE_DELAY = 
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
 
   private final boolean enableTaskAutoScaler;
   private final int taskCountMax;
   private final int taskCountMin;
-  private Integer taskCountStart;
+  private final Integer taskCountStart;
   private final long minTriggerScaleActionFrequencyMillis;
   private final Double stopTaskCountRatio;
   private final long scaleActionPeriodMillis;
@@ -61,17 +62,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   private final double lagWeight;
   private final double idleWeight;
   private final double defaultProcessingRate;
-  /**
-   * Represents the threshold value used to prevent the auto-scaler from 
scaling down tasks immediately,
-   * when the computed cost-based metrics fall below this barrier.
-   * A higher value implies a more conservative scaling down behavior, 
ensuring that tasks
-   * are not prematurely terminated in scenarios of potential workload spikes 
or insufficient cost savings.
-   */
-  private final int scaleDownBarrier;
-  /**
-   * Indicates whether task scaling down is limited to periods during task 
rollovers only.
-   * If set to {@code false}, allows scaling down during normal task run time.
-   */
+  private final boolean useTaskCountBoundaries;
+  private final int highLagThreshold;
+  private final Duration minScaleDownDelay;
   private final boolean scaleDownDuringTaskRolloverOnly;
 
   @JsonCreator
@@ -86,7 +79,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       @Nullable @JsonProperty("lagWeight") Double lagWeight,
       @Nullable @JsonProperty("idleWeight") Double idleWeight,
       @Nullable @JsonProperty("defaultProcessingRate") Double 
defaultProcessingRate,
-      @Nullable @JsonProperty("scaleDownBarrier") Integer scaleDownBarrier,
+      @Nullable @JsonProperty("useTaskCountBoundaries") Boolean 
useTaskCountBoundaries,
+      @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+      @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
       @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean 
scaleDownDuringTaskRolloverOnly
   )
   {
@@ -105,7 +100,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
     this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
     this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate, 
DEFAULT_PROCESSING_RATE);
-    this.scaleDownBarrier = Configs.valueOrDefault(scaleDownBarrier, 
DEFAULT_SCALE_DOWN_BARRIER);
+    this.useTaskCountBoundaries = 
Configs.valueOrDefault(useTaskCountBoundaries, false);
+    this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
+    this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, 
DEFAULT_MIN_SCALE_DELAY);
     this.scaleDownDuringTaskRolloverOnly = 
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
 
     if (this.enableTaskAutoScaler) {
@@ -133,7 +130,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
     Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 
0");
     Preconditions.checkArgument(this.defaultProcessingRate > 0, 
"defaultProcessingRate must be > 0");
-    Preconditions.checkArgument(this.scaleDownBarrier >= 0, "scaleDownBarrier 
must be >= 0");
+    Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, 
"minScaleDownDelay must be >= 0");
   }
 
   /**
@@ -212,12 +209,39 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     return defaultProcessingRate;
   }
 
+  /**
+   * Enables or disables the use of task count boundaries derived from the 
current partitions-per-task (PPT) ratio.
+   */
+  @JsonProperty("useTaskCountBoundaries")
+  public boolean shouldUseTaskCountBoundaries()
+  {
+    return useTaskCountBoundaries;
+  }
+
+  /**
+   * Per-partition lag threshold allowing to activate a burst scaleup to 
eliminate high lag.
+   */
+  @JsonProperty("highLagThreshold")
+  public int getHighLagThreshold()
+  {
+    return highLagThreshold;
+  }
+
+  /**
+   * Represents the minimum duration between successful scale actions.
+   * A higher value implies a more conservative scaling behavior, ensuring 
that tasks
+   * are not scaled too frequently during workload fluctuations.
+   */
   @JsonProperty
-  public int getScaleDownBarrier()
+  public Duration getMinScaleDownDelay()
   {
-    return scaleDownBarrier;
+    return minScaleDownDelay;
   }
 
+  /**
+   * Indicates whether task scaling down is limited to periods during task 
rollovers only.
+   * If set to {@code false}, allows scaling down during normal task run time.
+   */
   @JsonProperty("scaleDownDuringTaskRolloverOnly")
   public boolean isScaleDownOnTaskRolloverOnly()
   {
@@ -250,10 +274,12 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            && Double.compare(that.lagWeight, lagWeight) == 0
            && Double.compare(that.idleWeight, idleWeight) == 0
            && Double.compare(that.defaultProcessingRate, 
defaultProcessingRate) == 0
-           && scaleDownBarrier == that.scaleDownBarrier
+           && useTaskCountBoundaries == that.useTaskCountBoundaries
+           && Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
            && scaleDownDuringTaskRolloverOnly == 
that.scaleDownDuringTaskRolloverOnly
            && Objects.equals(taskCountStart, that.taskCountStart)
-           && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
+           && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
+           && highLagThreshold == that.highLagThreshold;
   }
 
   @Override
@@ -270,7 +296,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
         lagWeight,
         idleWeight,
         defaultProcessingRate,
-        scaleDownBarrier,
+        useTaskCountBoundaries,
+        highLagThreshold,
+        minScaleDownDelay,
         scaleDownDuringTaskRolloverOnly
     );
   }
@@ -289,7 +317,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            ", lagWeight=" + lagWeight +
            ", idleWeight=" + idleWeight +
            ", defaultProcessingRate=" + defaultProcessingRate +
-           ", scaleDownBarrier=" + scaleDownBarrier +
+           ", useTaskCountBoundaries=" + useTaskCountBoundaries +
+           ", highLagThreshold=" + highLagThreshold +
+           ", minScaleDownDelay=" + minScaleDownDelay +
            ", scaleDownDuringTaskRolloverOnly=" + 
scaleDownDuringTaskRolloverOnly +
            '}';
   }
@@ -310,7 +340,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     private Double lagWeight;
     private Double idleWeight;
     private Double defaultProcessingRate;
-    private Integer scaleDownBarrier;
+    private Boolean useTaskCountBoundaries;
+    private Integer highLagThreshold;
+    private Duration minScaleDownDelay;
     private Boolean scaleDownDuringTaskRolloverOnly;
 
     private Builder()
@@ -377,9 +409,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       return this;
     }
 
-    public Builder scaleDownBarrier(int scaleDownBarrier)
+    public Builder minScaleDownDelay(Duration minScaleDownDelay)
     {
-      this.scaleDownBarrier = scaleDownBarrier;
+      this.minScaleDownDelay = minScaleDownDelay;
       return this;
     }
 
@@ -389,6 +421,18 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       return this;
     }
 
+    public Builder useTaskCountBoundaries(boolean useTaskCountBoundaries)
+    {
+      this.useTaskCountBoundaries = useTaskCountBoundaries;
+      return this;
+    }
+
+    public Builder highLagThreshold(int highLagThreshold)
+    {
+      this.highLagThreshold = highLagThreshold;
+      return this;
+    }
+
     public CostBasedAutoScalerConfig build()
     {
       return new CostBasedAutoScalerConfig(
@@ -402,7 +446,9 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
           lagWeight,
           idleWeight,
           defaultProcessingRate,
-          scaleDownBarrier,
+          useTaskCountBoundaries,
+          highLagThreshold,
+          minScaleDownDelay,
           scaleDownDuringTaskRolloverOnly
       );
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 8a375955691..01b35242ed8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -30,24 +30,11 @@ public class WeightedCostFunction
 {
   private static final Logger log = new Logger(WeightedCostFunction.class);
   /**
-   * Represents the maximum multiplier factor applied to amplify lag-based 
costs in the cost computation process.
-   * This value is used to cap the lag amplification effect to prevent 
excessively high cost inflation
-   * caused by significant partition lag.
-   * It ensures that lag-related adjustments remain bounded within a 
reasonable range for stability of
-   * cost-based auto-scaling decisions.
+   * The lag severity at which lagBusyFactor reaches 1.0 (full idle 
suppression).
+   * lagSeverity is defined as lagPerPartition / highLagThreshold.
+   * At severity=1 (threshold), lagBusyFactor=0. At severity=MAX, 
lagBusyFactor=1.0.
    */
-  private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0;
-  private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L;
-  /**
-   * It is used to calculate the denominator for the ramp formula in the cost
-   * computation logic. This value represents the difference between the 
maximum lag per
-   * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling 
activation
-   * lag threshold 
(CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD).
-   * <p>
-   * It is impacting how the cost model evaluates scaling decisions during 
high-lag sceario.
-   */
-  private static final double RAMP_DENOMINATOR =
-      LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
+  private static final int LAG_AMPLIFICATION_MAX_SEVERITY = 5;
 
   /**
    * Computes cost for a given task count using compute time metrics.
@@ -64,7 +51,11 @@ public class WeightedCostFunction
    * @return CostResult containing totalCost, lagCost, and idleCost,
    * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
    */
-  public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, 
CostBasedAutoScalerConfig config)
+  public CostResult computeCost(
+      CostMetrics metrics,
+      int proposedTaskCount,
+      CostBasedAutoScalerConfig config
+  )
   {
     if (metrics == null || config == null || proposedTaskCount <= 0 || 
metrics.getPartitionCount() <= 0) {
       return new CostResult(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
@@ -86,7 +77,7 @@ public class WeightedCostFunction
       lagRecoveryTime = metrics.getAggregateLag() / (proposedTaskCount * 
avgProcessingRate);
     }
 
-    final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount);
+    final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount, config.getHighLagThreshold());
     final double idleCost = proposedTaskCount * 
metrics.getTaskDurationSeconds() * predictedIdleRatio;
     final double lagCost = config.getLagWeight() * lagRecoveryTime;
     final double weightedIdleCost = config.getIdleWeight() * idleCost;
@@ -107,19 +98,16 @@ public class WeightedCostFunction
 
   /**
    * Estimates the idle ratio for a proposed task count.
-   * Includes lag-based adjustment to eliminate high lag and
-   * reduce predicted idle when work exists.
-   * <p>
-   * Formulas:
-   * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)}
-   * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)}
-   * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)}
+   * Includes lag-based adjustment to suppress predicted idle when lag exceeds 
the threshold,
+   * encouraging scale-up when there is real work to do.
+   * The algorithm is adjusted with {@code computeExtraPPTIncrease}, so they 
may work in tandem, when enabled.
    *
    * @param metrics   current system metrics containing idle ratio and task 
count
    * @param taskCount target task count to estimate an idle ratio for
    * @return estimated idle ratio in range [0.0, 1.0]
    */
-  private double estimateIdleRatio(CostMetrics metrics, int taskCount)
+  @SuppressWarnings("ExtractMethodRecommender")
+  private double estimateIdleRatio(CostMetrics metrics, int taskCount, int 
highLagThreshold)
   {
     final double currentPollIdleRatio = metrics.getPollIdleRatio();
 
@@ -138,24 +126,13 @@ public class WeightedCostFunction
     final double taskRatio = (double) taskCount / currentTaskCount;
     final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - 
busyFraction / taskRatio));
 
-    // Lag-based adjustment: more work per task → less idle
-    final double lagPerTask = metrics.getAggregateLag() / taskCount;
-    double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / 
CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD);
-    final int partitionCount = metrics.getPartitionCount();
-
-    if (partitionCount > 0) {
-      final double lagPerPartition = metrics.getAggregateLag() / 
partitionCount;
-      // Lag-amplified idle decay
-      if (lagPerPartition >= 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
-        double ramp = Math.max(0.0,
-                               (lagPerPartition - 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD)
-                               / RAMP_DENOMINATOR
-        );
-        ramp = Math.min(1.0, ramp);
-
-        final double multiplier = 1.0 + ramp * 
(LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0);
-        lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier);
-      }
+    final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
+    double lagBusyFactor = 0.;
+
+    // Lag-amplified idle decay using ln(lagSeverity) / ln(maxSeverity).
+    if (highLagThreshold > 0 && lagPerPartition >= highLagThreshold) {
+      final double lagSeverity = lagPerPartition / highLagThreshold;
+      lagBusyFactor = Math.min(1.0, Math.log(lagSeverity) / 
Math.log(LAG_AMPLIFICATION_MAX_SEVERITY));
     }
 
     // Clamp to valid range [0, 1]
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 364e0d75b80..96e15672bd1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -21,15 +21,16 @@ package 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
-import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_DOWN_BARRIER;
 
 public class CostBasedAutoScalerConfigTest
 {
@@ -50,7 +51,8 @@ public class CostBasedAutoScalerConfigTest
                   + "  \"lagWeight\": 0.6,\n"
                   + "  \"idleWeight\": 0.4,\n"
                   + "  \"defaultProcessingRate\": 2000.0,\n"
-                  + "  \"scaleDownBarrier\": 10,\n"
+                  + "  \"highLagThreshold\": 30000,\n"
+                  + "  \"minScaleDownDelay\": \"PT10M\",\n"
                   + "  \"scaleDownDuringTaskRolloverOnly\": true\n"
                   + "}";
 
@@ -66,8 +68,9 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
     Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
-    Assert.assertEquals(10, config.getScaleDownBarrier());
+    Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+    Assert.assertEquals(30000, config.getHighLagThreshold());
 
     // Test serialization back to JSON
     String serialized = mapper.writeValueAsString(config);
@@ -94,14 +97,19 @@ public class CostBasedAutoScalerConfigTest
 
     // Check defaults
     Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, 
config.getScaleActionPeriodMillis());
-    Assert.assertEquals(DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS, 
config.getMinTriggerScaleActionFrequencyMillis());
+    Assert.assertEquals(
+        DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
+        config.getMinTriggerScaleActionFrequencyMillis()
+    );
     Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
     Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
     Assert.assertEquals(DEFAULT_PROCESSING_RATE, 
config.getDefaultProcessingRate(), 0.001);
-    Assert.assertEquals(DEFAULT_SCALE_DOWN_BARRIER, 
config.getScaleDownBarrier());
+    Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, 
config.getMinScaleDownDelay());
     Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
     Assert.assertNull(config.getTaskCountStart());
     Assert.assertNull(config.getStopTaskCountRatio());
+    // When highLagThreshold is not set, it defaults to -1 (burst scale-up 
disabled)
+    Assert.assertEquals(-1, config.getHighLagThreshold());
   }
 
   @Test
@@ -174,19 +182,20 @@ public class CostBasedAutoScalerConfigTest
   public void testBuilder()
   {
     CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
-                                                                 
.taskCountMax(100)
-                                                                 
.taskCountMin(5)
-                                                                 
.taskCountStart(10)
-                                                                 
.enableTaskAutoScaler(true)
-                                                                 
.minTriggerScaleActionFrequencyMillis(600000L)
-                                                                 
.stopTaskCountRatio(0.8)
-                                                                 
.scaleActionPeriodMillis(60000L)
-                                                                 
.lagWeight(0.6)
-                                                                 
.idleWeight(0.4)
-                                                                 
.defaultProcessingRate(2000.0)
-                                                                 
.scaleDownBarrier(10)
-                                                                 
.scaleDownDuringTaskRolloverOnly(true)
-                                                                 .build();
+                                                                
.taskCountMax(100)
+                                                                
.taskCountMin(5)
+                                                                
.taskCountStart(10)
+                                                                
.enableTaskAutoScaler(true)
+                                                                
.minTriggerScaleActionFrequencyMillis(600000L)
+                                                                
.stopTaskCountRatio(0.8)
+                                                                
.scaleActionPeriodMillis(60000L)
+                                                                .lagWeight(0.6)
+                                                                
.idleWeight(0.4)
+                                                                
.defaultProcessingRate(2000.0)
+                                                                
.minScaleDownDelay(Duration.standardMinutes(10))
+                                                                
.scaleDownDuringTaskRolloverOnly(true)
+                                                                
.highLagThreshold(30000)
+                                                                .build();
 
     Assert.assertTrue(config.getEnableTaskAutoScaler());
     Assert.assertEquals(100, config.getTaskCountMax());
@@ -198,7 +207,8 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
     Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
-    Assert.assertEquals(10, config.getScaleDownBarrier());
+    Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
+    Assert.assertEquals(30000, config.getHighLagThreshold());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
new file mode 100644
index 00000000000..122a449c5f7
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link CostBasedAutoScaler#computeExtraPPTIncrease}.
+ * <p>
+ * The burst scaling uses a logarithmic formula:
+ * {@code deltaTasks = K * ln(lagSeverity)}
+ * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}.
+ */
+public class CostBasedAutoScalerHighLagScalingTest
+{
+  private static final int LAG_THRESHOLD = 50_000;
+  private static final int PARTITION_COUNT = 48;
+  private static final int TASK_COUNT_MAX = 48;
+
+  /**
+   * Tests scaling behavior across different lag levels and task counts.
+   * <p>
+   * Expected behavior for 48 partitions with threshold=50K:
+   * <pre>
+   * | Current | Lag/Part | PPT reduction | Notes                              
     |
+   * 
|---------|----------|---------------|-----------------------------------------|
+   * | any     | <50K     | 0             | Below threshold                    
     |
+   * | any     | =50K     | 0             | ln(1) = 0                          
     |
+   * | 1       | 100K     | 40            | Significant boost for recovery     
     |
+   * | 1       | 200K     | 43            | Large boost                        
     |
+   * | 4       | 200K     | 6             | Moderate boost                     
     |
+   * | 12      | 200K     | 0             | Delta too small for PPT change     
     |
+   * | 24      | 200K     | 0             | Delta too small for PPT change     
     |
+   * </pre>
+   */
+  @Test
+  public void testComputeExtraPPTIncrease()
+  {
+    // Below threshold: no boost
+    Assert.assertEquals(
+        "Below threshold should not increase PPT",
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(
+            LAG_THRESHOLD,
+            PARTITION_COUNT * 40_000L,
+            PARTITION_COUNT,
+            4,
+            TASK_COUNT_MAX
+        )
+    );
+
+    // At threshold (lagSeverity=1, ln(1)=0): no boost
+    Assert.assertEquals(
+        "At threshold (ln(1)=0) should not increase PPT",
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(
+            LAG_THRESHOLD,
+            PARTITION_COUNT * 50_000L,
+            PARTITION_COUNT,
+            4,
+            TASK_COUNT_MAX
+        )
+    );
+
+    // C=1, 100K lag (2x threshold): significant boost for emergency recovery
+    int boost1_100k = CostBasedAutoScaler.computeExtraPPTIncrease(
+        LAG_THRESHOLD,
+        PARTITION_COUNT * 100_000L,
+        PARTITION_COUNT,
+        1,
+        TASK_COUNT_MAX
+    );
+    Assert.assertEquals("C=1, 100K lag boost", 40, boost1_100k);
+
+    // C=1, 200K lag (4x threshold): large boost
+    int boost1_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+        LAG_THRESHOLD,
+        PARTITION_COUNT * 200_000L,
+        PARTITION_COUNT,
+        1,
+        TASK_COUNT_MAX
+    );
+    Assert.assertEquals("C=1, 200K lag boost", 43, boost1_200k);
+
+    // C=4, 200K lag: moderate boost (K decreases with sqrt(C))
+    int boost4_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+        LAG_THRESHOLD,
+        PARTITION_COUNT * 200_000L,
+        PARTITION_COUNT,
+        4,
+        TASK_COUNT_MAX
+    );
+    Assert.assertEquals("C=4, 200K lag should yield a modest PPT increase", 6, 
boost4_200k);
+
+    // C=12, 200K lag: delta too small to change PPT
+    int boost12_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+        LAG_THRESHOLD,
+        PARTITION_COUNT * 200_000L,
+        PARTITION_COUNT,
+        12,
+        TASK_COUNT_MAX
+    );
+    Assert.assertEquals("C=12, 200K lag should not change PPT", 0, 
boost12_200k);
+
+    // C=24, 200K lag: delta too small to change PPT
+    int boost24_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
+        LAG_THRESHOLD,
+        PARTITION_COUNT * 200_000L,
+        PARTITION_COUNT,
+        24,
+        TASK_COUNT_MAX
+    );
+    Assert.assertEquals("C=24, 200K lag should not change PPT", 0, 
boost24_200k);
+  }
+
+  @Test
+  public void testComputeExtraPPTIncreaseInvalidInputs()
+  {
+    Assert.assertEquals(
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
0, 4, 48)
+    );
+    Assert.assertEquals(
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
48, 0, 48)
+    );
+    Assert.assertEquals(
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
48, 4, 0)
+    );
+    Assert.assertEquals(
+        0,
+        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
-1, 4, 48)
+    );
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index becfd4964b4..d0e9f90f584 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -77,13 +77,13 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleUpWhenOptimalGreaterThanCurrent()
   {
-    // Use config with barrier=2 to test counter reset behavior
+    // Use config with a long barrier to test cooldown behavior
     CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                        
.taskCountMax(100)
-                                                                        
.taskCountMin(1)
-                                                                        
.enableTaskAutoScaler(true)
-                                                                        
.scaleDownBarrier(2)
-                                                                        
.build();
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.minScaleDownDelay(Duration.standardHours(1))
+                                                                       
.build();
 
     CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
         mockSupervisor,
@@ -94,14 +94,7 @@ public class CostBasedAutoScalerMockTest
 
     int currentTaskCount = 10;
     int scaleUpOptimal = 17;
-    int scaleDownOptimal = 5;
-
-    // First, increment the scaleDownCounter by making a scale-down attempt
-    doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
-    Assert.assertEquals("Scale-down blocked (counter=1)", -1, 
autoScaler.computeTaskCountForScaleAction());
-
-    // Now trigger scale-up, which should reset the counter
+    // Trigger scale-up, which should set the cooldown timer
     doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
 
@@ -111,11 +104,11 @@ public class CostBasedAutoScalerMockTest
         autoScaler.computeTaskCountForScaleAction()
     );
 
-    // Verify counter was reset: scale-down should be blocked again (counter 
starts from 0)
-    doReturn(scaleDownOptimal).when(autoScaler).computeOptimalTaskCount(any());
+    // Verify cooldown blocks immediate subsequent scaling
+    doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
     Assert.assertEquals(
-        "Scale-down should be blocked after scale-up reset the counter",
+        "Scale action should be blocked during the cooldown window",
         -1,
         autoScaler.computeTaskCountForScaleAction()
     );
@@ -140,13 +133,13 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleDownBlockedReturnsMinusOne()
   {
-    // Use config with barrier=2 to test counter behavior
+    // Use config with a long barrier to test cooldown behavior
     CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                        
.taskCountMax(100)
-                                                                        
.taskCountMin(1)
-                                                                        
.enableTaskAutoScaler(true)
-                                                                        
.scaleDownBarrier(2)
-                                                                        
.build();
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.minScaleDownDelay(Duration.standardHours(1))
+                                                                       
.build();
 
     CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
         mockSupervisor,
@@ -161,23 +154,16 @@ public class CostBasedAutoScalerMockTest
     doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
 
-    // First attempt: counter=1, blocked
+    // First attempt: allowed (no prior scale action)
     Assert.assertEquals(
-        "Should return -1 when optimal is less than current (scale-down 
blocked, counter=1)",
-        -1,
-        autoScaler.computeTaskCountForScaleAction()
-    );
-
-    // Second attempt: counter=2, succeeds (barrier reached)
-    Assert.assertEquals(
-        "Scale-down should succeed when barrier reached",
+        "Scale-down should succeed when no prior scale action exists",
         optimalCount,
         autoScaler.computeTaskCountForScaleAction()
     );
 
-    // Verify counter was reset: next scale-down should be blocked again
+    // Second attempt: blocked by cooldown
     Assert.assertEquals(
-        "Scale-down should be blocked after successful scale-down reset the 
counter",
+        "Scale-down should be blocked during the cooldown window",
         -1,
         autoScaler.computeTaskCountForScaleAction()
     );
@@ -300,8 +286,8 @@ public class CostBasedAutoScalerMockTest
     int result = autoScaler.computeTaskCountForScaleAction();
 
     Assert.assertEquals(
-        "Should block scale-down even by one task",
-        -1,
+        "Should allow scale-down by one task when cooldown has elapsed",
+        optimalCount,
         result
     );
   }
@@ -314,7 +300,7 @@ public class CostBasedAutoScalerMockTest
                                                                             
.taskCountMin(1)
                                                                             
.enableTaskAutoScaler(true)
                                                                             
.scaleDownDuringTaskRolloverOnly(true)
-                                                                            
.scaleDownBarrier(1) // Set the barrier to 1 so it would trigger immediately if 
not blocked
+                                                                            
.minScaleDownDelay(Duration.ZERO)
                                                                             
.build();
 
     CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
@@ -345,6 +331,7 @@ public class CostBasedAutoScalerMockTest
                                                                             
.taskCountMin(1)
                                                                             
.enableTaskAutoScaler(true)
                                                                             
.scaleDownDuringTaskRolloverOnly(true)
+                                                                            
.minScaleDownDelay(Duration.ZERO)
                                                                             
.build();
 
     CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
@@ -357,11 +344,12 @@ public class CostBasedAutoScalerMockTest
     int currentTaskCount = 50;
     int optimalCount = 30;
 
-    // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
-    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    // Set up lastKnownMetrics by calling computeTaskCountForScaleAction first 
without scaling
+    doReturn(currentTaskCount).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
     autoScaler.computeTaskCountForScaleAction(); // This populates 
lastKnownMetrics
 
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
     Assert.assertEquals(
         "Should scale-down during rollover when 
scaleDownDuringTaskRolloverOnly is true",
         optimalCount,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 5deae662d75..bb6eca691a2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -25,12 +25,12 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -38,17 +38,13 @@ import java.util.Map;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
-import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD;
-import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
-import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("SameParameterValue")
+@SuppressWarnings({"SameParameterValue", "InstantiationOfUtilityClass"})
 public class CostBasedAutoScalerTest
 {
   private CostBasedAutoScaler autoScaler;
-  private CostBasedAutoScalerConfig config;
 
   @Before
   public void setUp()
@@ -62,169 +58,106 @@ public class CostBasedAutoScalerTest
     when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
     when(mockIoConfig.getStream()).thenReturn("test-stream");
 
-    config = CostBasedAutoScalerConfig.builder()
-                                      .taskCountMax(100)
-                                      .taskCountMin(1)
-                                      .enableTaskAutoScaler(true)
-                                      .lagWeight(0.6)
-                                      .idleWeight(0.4)
-                                      .build();
+    CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
+                                                                
.taskCountMax(100)
+                                                                
.taskCountMin(1)
+                                                                
.enableTaskAutoScaler(true)
+                                                                .lagWeight(0.6)
+                                                                
.idleWeight(0.4)
+                                                                .build();
 
     autoScaler = new CostBasedAutoScaler(mockSupervisor, config, 
mockSupervisorSpec, mockEmitter);
   }
 
+  @SuppressWarnings("InstantiationOfUtilityClass")
   @Test
   public void testComputeValidTaskCounts()
   {
-    // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
-    int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 1, 100);
-    Assert.assertTrue("Should contain the current task count", 
contains(validTaskCounts, 25));
-    Assert.assertTrue("Should contain the next scale-up option", 
contains(validTaskCounts, 34));
+    boolean useTaskCountBoundaries = true;
+    int highLagThreshold = 50_000;
 
-    // Edge cases
-    Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 1, 100).length);
-    Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 1, 100).length);
+    // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
+    int[] validTaskCounts = computeValidTaskCounts(
+        100,
+        25,
+        0L,
+        1,
+        100,
+        useTaskCountBoundaries,
+        highLagThreshold
+    );
+    Assert.assertTrue("Expected current task count to be included", 
contains(validTaskCounts, 25));
+    Assert.assertTrue("Expected next scale-up option (34) to be included", 
contains(validTaskCounts, 34));
 
     // Single partition
-    int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 1, 100);
-    Assert.assertTrue("Single partition should have at least one valid count", 
singlePartition.length > 0);
-    Assert.assertTrue("Single partition should contain 1", 
contains(singlePartition, 1));
+    int[] singlePartition = computeValidTaskCounts(
+        1,
+        1,
+        0L,
+        1,
+        100,
+        useTaskCountBoundaries,
+        highLagThreshold
+    );
+    Assert.assertTrue("Single partition should yield at least one valid 
count", singlePartition.length > 0);
+    Assert.assertTrue("Single partition should include task count 1", 
contains(singlePartition, 1));
 
     // Current exceeds partitions - should still yield valid, deduplicated 
options
-    int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 1, 100);
+    int[] exceedsPartitions = computeValidTaskCounts(
+        2,
+        5,
+        0L,
+        1,
+        100,
+        useTaskCountBoundaries,
+        highLagThreshold
+    );
     Assert.assertEquals(2, exceedsPartitions.length);
     Assert.assertTrue(contains(exceedsPartitions, 1));
     Assert.assertTrue(contains(exceedsPartitions, 2));
 
-    // Lag expansion: low lag should not include max, high lag should
-    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30);
+    // Lag expansion: low lag should not include max, high lag should allow 
aggressive scaling
+    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30, 
useTaskCountBoundaries, highLagThreshold);
     Assert.assertFalse("Low lag should not include max task count", 
contains(lowLagCounts, 30));
-    Assert.assertTrue("Low lag should cap scale up around 4 tasks", 
contains(lowLagCounts, 4));
+    Assert.assertTrue("Low lag should cap scale-up around 4 tasks", 
contains(lowLagCounts, 4));
 
+    // High lag uses logarithmic formula: K * ln(lagSeverity) where K = 
P/(6.4*sqrt(C))
+    // For P=30, C=3, lagPerPartition=500K, threshold=50K: lagSeverity=10, 
K=2.7, delta=6.2
+    // This allows controlled scaling to ~10-15 tasks (not all the way to max)
     long highAggregateLag = 30L * 500_000L;
-    int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 1, 
30);
-    Assert.assertTrue("High lag should allow scaling to max tasks", 
contains(highLagCounts, 30));
+    int[] highLagCounts = computeValidTaskCounts(
+        30,
+        3,
+        highAggregateLag,
+        1,
+        30,
+        useTaskCountBoundaries,
+        highLagThreshold
+    );
+    Assert.assertTrue("High lag should allow scaling to 10 tasks", 
contains(highLagCounts, 10));
+    Assert.assertTrue("High lag should allow scaling to 15 tasks", 
contains(highLagCounts, 15));
+    Assert.assertFalse("High lag should not jump straight to max (30) from 3", 
contains(highLagCounts, 30));
 
     // Respects taskCountMax
-    int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3);
-    Assert.assertTrue("Should include taskCountMax when doable", 
contains(cappedCounts, 3));
+    int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3, 
useTaskCountBoundaries, highLagThreshold);
+    Assert.assertTrue("Should include taskCountMax when within bounds", 
contains(cappedCounts, 3));
     Assert.assertFalse("Should not exceed taskCountMax", 
contains(cappedCounts, 4));
 
     // Respects taskCountMin - filters out values below the minimum
     // With partitionCount=100, currentTaskCount=10, the computed range 
includes values like 8, 9, 10, 12, 13
-    int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100);
-    Assert.assertFalse("Should not go below taskCountMin", 
contains(minCappedCounts, 8));
-    Assert.assertFalse("Should not go below taskCountMin", 
contains(minCappedCounts, 9));
-    Assert.assertTrue("Should include values at taskCountMin", 
contains(minCappedCounts, 10));
-    Assert.assertTrue("Should include values above taskCountMin", 
contains(minCappedCounts, 12));
+    int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100, 
useTaskCountBoundaries, highLagThreshold);
+    Assert.assertFalse("Should not include values below taskCountMin (8)", 
contains(minCappedCounts, 8));
+    Assert.assertFalse("Should not include values below taskCountMin (9)", 
contains(minCappedCounts, 9));
+    Assert.assertTrue("Should include values at taskCountMin (10)", 
contains(minCappedCounts, 10));
+    Assert.assertTrue("Should include values above taskCountMin (12)", 
contains(minCappedCounts, 12));
 
     // Both bounds applied together
-    int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12);
-    Assert.assertFalse("Should not go below taskCountMin", 
contains(bothBounds, 8));
-    Assert.assertFalse("Should not go below taskCountMin", 
contains(bothBounds, 9));
-    Assert.assertFalse("Should not exceed taskCountMax", contains(bothBounds, 
13));
-    Assert.assertTrue("Should include values at taskCountMin", 
contains(bothBounds, 10));
-    Assert.assertTrue("Should include values at taskCountMax", 
contains(bothBounds, 12));
-  }
-
-  @Test
-  public void testScalingExamplesTable()
-  {
-    int partitionCount = 30;
-    int taskCountMax = 30;
-    double pollIdleRatio = 0.1;
-    double avgProcessingRate = 10.0;
-
-    // Create a local autoScaler with taskCountMax matching the test parameters
-    SupervisorSpec mockSpec = Mockito.mock(SupervisorSpec.class);
-    SeekableStreamSupervisor mockSupervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
-    ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
-    SeekableStreamSupervisorIOConfig mockIoConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
-
-    when(mockSpec.getId()).thenReturn("test-supervisor");
-    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
-    when(mockIoConfig.getStream()).thenReturn("test-stream");
-
-    CostBasedAutoScalerConfig localConfig = CostBasedAutoScalerConfig.builder()
-                                                                      
.taskCountMax(taskCountMax)
-                                                                      
.taskCountMin(1)
-                                                                      
.enableTaskAutoScaler(true)
-                                                                      
.lagWeight(0.6)
-                                                                      
.idleWeight(0.4)
-                                                                      .build();
-
-    CostBasedAutoScaler localAutoScaler = new CostBasedAutoScaler(
-        mockSupervisor,
-        localConfig,
-        mockSpec,
-        mockEmitter
-    );
-
-    class Example
-    {
-      final int currentTasks;
-      final long lagPerPartition;
-      final int expectedTasks;
-
-      Example(int currentTasks, long lagPerPartition, int expectedTasks)
-      {
-        this.currentTasks = currentTasks;
-        this.lagPerPartition = lagPerPartition;
-        this.expectedTasks = expectedTasks;
-      }
-    }
-
-    Example[] examples = new Example[]{
-        new Example(3, EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 8),
-        new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3, 15),
-        new Example(3, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30),
-        new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD - 1, 
15),
-        new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 3, 
30),
-        new Example(10, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 
30),
-        new Example(20, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 
30),
-        new Example(25, AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 
30)
-    };
-
-    for (Example example : examples) {
-      long aggregateLag = example.lagPerPartition * partitionCount;
-      int[] validCounts = computeValidTaskCounts(partitionCount, 
example.currentTasks, aggregateLag, 1, taskCountMax);
-      Assert.assertTrue(
-          "Should include expected task count for current=" + 
example.currentTasks + ", lag=" + example.lagPerPartition,
-          contains(validCounts, example.expectedTasks)
-      );
-
-      CostMetrics metrics = createMetricsWithRate(
-          example.lagPerPartition,
-          example.currentTasks,
-          partitionCount,
-          pollIdleRatio,
-          avgProcessingRate
-      );
-      int actualOptimal = localAutoScaler.computeOptimalTaskCount(metrics);
-      if (actualOptimal == -1) {
-        actualOptimal = example.currentTasks;
-      }
-      Assert.assertEquals(
-          "Optimal task count should match for current=" + example.currentTasks
-          + ", lag=" + example.lagPerPartition
-          + ", valid=" + Arrays.toString(validCounts),
-          example.expectedTasks,
-          actualOptimal
-      );
-    }
-  }
-
-  @Test
-  public void testComputeExtraPPTIncrease()
-  {
-    // No extra increase below the threshold
-    Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD - 1, 30, 3, 30));
-    Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * 
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30));
-
-    // More aggressive increase when the lag is high
-    Assert.assertEquals(8, computeExtraMaxPartitionsPerTaskIncrease(30L * 
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 5, 30, 3, 30));
-    // Zero when on max task count
-    Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD * 10, 30, 30, 30));
+    int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12, 
useTaskCountBoundaries, highLagThreshold);
+    Assert.assertFalse("Should not include values below taskCountMin (8)", 
contains(bothBounds, 8));
+    Assert.assertFalse("Should not include values below taskCountMin (9)", 
contains(bothBounds, 9));
+    Assert.assertFalse("Should not include values above taskCountMax (13)", 
contains(bothBounds, 13));
+    Assert.assertTrue("Should include values at taskCountMin (10)", 
contains(bothBounds, 10));
+    Assert.assertTrue("Should include values at taskCountMax (12)", 
contains(bothBounds, 12));
   }
 
   @Test
@@ -238,28 +171,38 @@ public class CostBasedAutoScalerTest
 
     // High idle (underutilized) - should scale down
     int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
-    Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult < 
25);
+    Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)", 
scaleDownResult < 25);
 
     // Very high idle with high task count - should scale down
     int highIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
-    Assert.assertTrue("Scale down scenario should return optimal <= current", 
highIdleResult <= 50);
+    Assert.assertTrue("High idle should not suggest scale-up", highIdleResult 
<= 50);
 
     // With low idle and balanced weights, the algorithm should not scale up 
aggressively
     int lowIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
-    Assert.assertTrue("With low idle and balanced weights, should not scale up 
aggressively", lowIdleResult <= 25);
+    Assert.assertTrue("With low idle and balanced weights, avoid aggressive 
scale-up", lowIdleResult <= 25);
   }
 
   @Test
   public void testExtractPollIdleRatio()
   {
     // Null and empty return 0
-    Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(null), 
0.0001);
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()), 0.0001);
+    Assert.assertEquals("Null stats should yield 0 idle ratio", 0., 
CostBasedAutoScaler.extractPollIdleRatio(null), 0.0001);
+    Assert.assertEquals(
+        "Empty stats should yield 0 idle ratio",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
+        0.0001
+    );
 
     // Missing metrics return 0
     Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
     missingMetrics.put("0", Collections.singletonMap("task-0", new 
HashMap<>()));
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(missingMetrics), 0.0001);
+    Assert.assertEquals(
+        "Missing autoscaler metrics should yield 0 idle ratio",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
+        0.0001
+    );
 
     // Valid stats return average
     Map<String, Map<String, Object>> validStats = new HashMap<>();
@@ -267,26 +210,46 @@ public class CostBasedAutoScalerTest
     group.put("task-0", buildTaskStatsWithPollIdle(0.3));
     group.put("task-1", buildTaskStatsWithPollIdle(0.5));
     validStats.put("0", group);
-    Assert.assertEquals(0.4, 
CostBasedAutoScaler.extractPollIdleRatio(validStats), 0.0001);
+    Assert.assertEquals(
+        "Average poll idle ratio should be computed across tasks",
+        0.4,
+        CostBasedAutoScaler.extractPollIdleRatio(validStats),
+        0.0001
+    );
 
     // Invalid types: non-map task metric
     Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
     nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
+    Assert.assertEquals(
+        "Non-map task stats should be ignored",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(nonMapTask),
+        0.0001
+    );
 
     // Invalid types: empty autoscaler metrics
     Map<String, Map<String, Object>> emptyAutoscaler = new HashMap<>();
     Map<String, Object> taskStats1 = new HashMap<>();
     taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new 
HashMap<>());
     emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
+    Assert.assertEquals(
+        "Empty autoscaler metrics should yield 0 idle ratio",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
+        0.0001
+    );
 
     // Invalid types: non-map autoscaler metrics
     Map<String, Map<String, Object>> nonMapAutoscaler = new HashMap<>();
     Map<String, Object> taskStats2 = new HashMap<>();
     taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, 
"not-a-map");
     nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
+    Assert.assertEquals(
+        "Non-map autoscaler metrics should be ignored",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler),
+        0.0001
+    );
 
     // Invalid types: non-number poll idle ratio
     Map<String, Map<String, Object>> nonNumberRatio = new HashMap<>();
@@ -295,20 +258,35 @@ public class CostBasedAutoScalerTest
     autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY, 
"not-a-number");
     taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, 
autoscalerMetrics);
     nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
-    Assert.assertEquals(0., 
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001);
+    Assert.assertEquals(
+        "Non-numeric poll idle ratio should be ignored",
+        0.,
+        CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio),
+        0.0001
+    );
   }
 
   @Test
   public void testExtractMovingAverage()
   {
     // Null and empty return -1
-    Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null), 
0.0001);
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001);
+    Assert.assertEquals("Null stats should yield -1 moving average", -1., 
CostBasedAutoScaler.extractMovingAverage(null), 0.0001);
+    Assert.assertEquals(
+        "Empty stats should yield -1 moving average",
+        -1.,
+        CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()),
+        0.0001
+    );
 
     // Missing metrics return -1
     Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
     missingMetrics.put("0", Collections.singletonMap("task-0", new 
HashMap<>()));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001);
+    Assert.assertEquals(
+        "Missing moving averages should yield -1",
+        -1.,
+        CostBasedAutoScaler.extractMovingAverage(missingMetrics),
+        0.0001
+    );
 
     // Valid stats return average (using 5-minute)
     Map<String, Map<String, Object>> validStats = new HashMap<>();
@@ -316,32 +294,80 @@ public class CostBasedAutoScalerTest
     group.put("task-0", buildTaskStatsWithMovingAverage(1000.0));
     group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
     validStats.put("0", group);
-    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
+    Assert.assertEquals(
+        "Average 5-minute processing rate should be computed across tasks",
+        1500.0,
+        CostBasedAutoScaler.extractMovingAverage(validStats),
+        0.0001
+    );
 
     // Interval fallback: 15-minute preferred, then 5-minute, then 1-minute
     Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
-    fifteenMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
-    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
+    fifteenMin.put(
+        "0",
+        Collections.singletonMap(
+            "task-0",
+            buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 
1500.0)
+        )
+    );
+    Assert.assertEquals(
+        "15-minute interval should be preferred when available",
+        1500.0,
+        CostBasedAutoScaler.extractMovingAverage(fifteenMin),
+        0.0001
+    );
 
     // 1-minute as a final fallback
     Map<String, Map<String, Object>> oneMin = new HashMap<>();
-    oneMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
-    Assert.assertEquals(500.0, 
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
+    oneMin.put(
+        "0",
+        Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0))
+    );
+    Assert.assertEquals(
+        "1-minute interval should be used as a final fallback",
+        500.0,
+        CostBasedAutoScaler.extractMovingAverage(oneMin),
+        0.0001
+    );
 
     // 15-minute preferred over 5-minute when both available
     Map<String, Map<String, Object>> allIntervals = new HashMap<>();
-    allIntervals.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0)));
-    Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001);
+    allIntervals.put(
+        "0",
+        Collections.singletonMap("task-0", 
buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0))
+    );
+    Assert.assertEquals(
+        "15-minute interval should win when multiple intervals are present",
+        1500.0,
+        CostBasedAutoScaler.extractMovingAverage(allIntervals),
+        0.0001
+    );
 
     // Falls back to 5-minute when 15-minute is null
     Map<String, Map<String, Object>> nullFifteen = new HashMap<>();
-    nullFifteen.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0)));
-    Assert.assertEquals(750.0, 
CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001);
+    nullFifteen.put(
+        "0",
+        Collections.singletonMap(
+            "task-0",
+            buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, 
FIVE_MINUTE_NAME, 750.0)
+        )
+    );
+    Assert.assertEquals(
+        "Should fall back to 5-minute when 15-minute is null",
+        750.0,
+        CostBasedAutoScaler.extractMovingAverage(nullFifteen),
+        0.0001
+    );
 
     // Falls back to 1-minute when both 15 and 5 are null
     Map<String, Map<String, Object>> bothNull = new HashMap<>();
     bothNull.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithTwoNullIntervals(250.0)));
-    Assert.assertEquals(250.0, 
CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001);
+    Assert.assertEquals(
+        "Should fall back to 1-minute when 15 and 5 are null",
+        250.0,
+        CostBasedAutoScaler.extractMovingAverage(bothNull),
+        0.0001
+    );
   }
 
   @Test
@@ -350,19 +376,34 @@ public class CostBasedAutoScalerTest
     // Non-map task metric
     Map<String, Map<String, Object>> nonMapTask = new HashMap<>();
     nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
+    Assert.assertEquals(
+        "Non-map task stats should be ignored",
+        -1.,
+        CostBasedAutoScaler.extractMovingAverage(nonMapTask),
+        0.0001
+    );
 
     Map<String, Map<String, Object>> missingBuild = new HashMap<>();
     Map<String, Object> taskStats1 = new HashMap<>();
     taskStats1.put("movingAverages", new HashMap<>());
     missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
+    Assert.assertEquals(
+        "Missing buildSegments moving average should yield -1",
+        -1.,
+        CostBasedAutoScaler.extractMovingAverage(missingBuild),
+        0.0001
+    );
 
     Map<String, Map<String, Object>> nonMapMA = new HashMap<>();
     Map<String, Object> taskStats2 = new HashMap<>();
     taskStats2.put("movingAverages", "not-a-map");
     nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
-    Assert.assertEquals(-1., 
CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
+    Assert.assertEquals(
+        "Non-map movingAverages should be ignored",
+        -1.,
+        CostBasedAutoScaler.extractMovingAverage(nonMapMA),
+        0.0001
+    );
   }
 
   @Test
@@ -377,26 +418,29 @@ public class CostBasedAutoScalerTest
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("stream");
 
-    // Test config defaults for scaleDownBarrier, defaultProcessingRate, 
scaleDownDuringTaskRolloverOnly
+    // Test config defaults for minScaleDownDelay, defaultProcessingRate, 
scaleDownDuringTaskRolloverOnly
     CostBasedAutoScalerConfig cfgWithDefaults = 
CostBasedAutoScalerConfig.builder()
-                                                                          
.taskCountMax(10)
-                                                                          
.taskCountMin(1)
-                                                                          
.enableTaskAutoScaler(true)
-                                                                          
.build();
-    Assert.assertEquals(5, cfgWithDefaults.getScaleDownBarrier());
+                                                                         
.taskCountMax(10)
+                                                                         
.taskCountMin(1)
+                                                                         
.enableTaskAutoScaler(true)
+                                                                         
.build();
+    Assert.assertEquals(
+        CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY,
+        cfgWithDefaults.getMinScaleDownDelay()
+    );
     Assert.assertEquals(1000.0, cfgWithDefaults.getDefaultProcessingRate(), 
0.001);
     Assert.assertFalse(cfgWithDefaults.isScaleDownOnTaskRolloverOnly());
 
     // Test custom config values
     CostBasedAutoScalerConfig cfgWithCustom = 
CostBasedAutoScalerConfig.builder()
-                                                                        
.taskCountMax(10)
-                                                                        
.taskCountMin(1)
-                                                                        
.enableTaskAutoScaler(true)
-                                                                        
.scaleDownBarrier(10)
-                                                                        
.defaultProcessingRate(5000.0)
-                                                                        
.scaleDownDuringTaskRolloverOnly(true)
-                                                                        
.build();
-    Assert.assertEquals(10, cfgWithCustom.getScaleDownBarrier());
+                                                                       
.taskCountMax(10)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.minScaleDownDelay(Duration.standardMinutes(10))
+                                                                       
.defaultProcessingRate(5000.0)
+                                                                       
.scaleDownDuringTaskRolloverOnly(true)
+                                                                       
.build();
+    Assert.assertEquals(Duration.standardMinutes(10), 
cfgWithCustom.getMinScaleDownDelay());
     Assert.assertEquals(5000.0, cfgWithCustom.getDefaultProcessingRate(), 
0.001);
     Assert.assertTrue(cfgWithCustom.isScaleDownOnTaskRolloverOnly());
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 4f5832915f6..c5b2b867e66 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -102,7 +102,7 @@ public class WeightedCostFunctionTest
 
     // Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
     double costCurrent = costFunction.computeCost(metrics, 10, 
lagOnlyConfig).totalCost();
-    Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1);
+    Assert.assertEquals("Cost of current tasks", 1000., costCurrent, 0.1);
 
     // Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
     double costUp5 = costFunction.computeCost(metrics, 15, 
lagOnlyConfig).totalCost();
@@ -298,11 +298,12 @@ public class WeightedCostFunctionTest
   @Test
   public void testLagAmplificationReducesIdleUnderHighLag()
   {
-    CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
+    CostBasedAutoScalerConfig configWithThreshold = 
CostBasedAutoScalerConfig.builder()
                                                                         
.taskCountMax(100)
                                                                         
.taskCountMin(1)
                                                                         
.enableTaskAutoScaler(true)
                                                                         
.defaultProcessingRate(1000.0)
+                                                                        
.highLagThreshold(10_000)
                                                                         
.build();
 
     int currentTaskCount = 3;
@@ -310,17 +311,96 @@ public class WeightedCostFunctionTest
     int partitionCount = 30;
     double pollIdleRatio = 0.1;
 
+    // lowLag (5000) is below threshold, highLag (500000) is well above
     CostMetrics lowLag = createMetrics(5_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
     CostMetrics highLag = createMetrics(500_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
 
-    double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, 
idleOnlyConfig).totalCost();
-    double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, 
idleOnlyConfig).totalCost();
+    double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, 
configWithThreshold).totalCost();
+    double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, 
configWithThreshold).totalCost();
     Assert.assertTrue(
         "Higher lag should reduce predicted idle more aggressively",
         lowLagCost > highLagCost
     );
   }
 
+  @Test
+  public void testCustomLagThresholdsAffectCostCalculation()
+  {
+    // Test that custom threshold values change behavior compared to defaults
+    int currentTaskCount = 3;
+    int proposedTaskCount = 8;
+    int partitionCount = 30;
+    double pollIdleRatio = 0.1;
+
+    // Use lag that exceeds sensitive threshold (10000) but not default (-1, 
disabled)
+    CostMetrics metrics = createMetrics(15_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+
+    // Default config: highLagThreshold=-1 (disabled), no lag amplification
+    CostBasedAutoScalerConfig defaultConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.defaultProcessingRate(1000.0)
+                                                                        
.build();
+
+    // Sensitive config: threshold 10000, lag 15000 > 10000, amplification 
happens
+    CostBasedAutoScalerConfig sensitiveConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                          
.taskCountMax(100)
+                                                                          
.taskCountMin(1)
+                                                                          
.enableTaskAutoScaler(true)
+                                                                          
.defaultProcessingRate(1000.0)
+                                                                          
.highLagThreshold(10000)
+                                                                          
.build();
+
+    double defaultCost = costFunction.computeCost(metrics, proposedTaskCount, 
defaultConfig).totalCost();
+    double sensitiveCost = costFunction.computeCost(metrics, 
proposedTaskCount, sensitiveConfig).totalCost();
+
+    // With lower thresholds, the same lag triggers more aggressive scaling 
behavior
+    // (higher lagBusyFactor), which results in lower predicted idle and thus 
lower idle cost
+    Assert.assertTrue(
+        "More sensitive thresholds should result in different (lower) cost",
+        sensitiveCost < defaultCost
+    );
+  }
+
+  @Test
+  public void testLnSeverityScalesWithLag()
+  {
+    // Test that ln_severity lagBusyFactor increases with lag severity,
+    // producing lower idle cost at higher lag.
+    // lagSeverity = lagPerPartition / threshold
+    // lagBusyFactor = min(1.0, ln(lagSeverity) / ln(5))
+    int currentTaskCount = 3;
+    int proposedTaskCount = 8;
+    int partitionCount = 30;
+    double pollIdleRatio = 0.1;
+
+    CostBasedAutoScalerConfig customConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.defaultProcessingRate(1000.0)
+                                                                       
.highLagThreshold(10000)
+                                                                       
.build();
+
+    // Lag exactly at threshold (lagPerPartition = 10000, severity=1.0)
+    // lagBusyFactor = ln(1) / ln(5) = 0
+    CostMetrics atThreshold = createMetrics(10_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+
+    // Lag at 5x threshold (lagPerPartition = 50000, severity=5.0)
+    // lagBusyFactor = ln(5) / ln(5) = 1.0
+    CostMetrics atMaxSeverity = createMetrics(50_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+
+    double costAtThreshold = costFunction.computeCost(atThreshold, 
proposedTaskCount, customConfig).totalCost();
+    double costAtMax = costFunction.computeCost(atMaxSeverity, 
proposedTaskCount, customConfig).totalCost();
+
+    // At max severity, lagBusyFactor=1.0, idle is fully suppressed → lower 
cost
+    Assert.assertTrue(
+        "Cost at max severity should be lower due to full idle suppression",
+        costAtMax < costAtThreshold
+    );
+  }
+
   private CostMetrics createMetrics(
       double avgPartitionLag,
       int currentTaskCount,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to