This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e6a01b747c8 Adjust cost-based autoscaler algorithm (#18936)
e6a01b747c8 is described below

commit e6a01b747c81677dea02da3591791f26d5e333e1
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Mon Jan 26 19:29:41 2026 +0200

    Adjust cost-based autoscaler algorithm (#18936)
---
 .../supervisor/SeekableStreamSupervisor.java       |  14 +-
 .../SeekableStreamSupervisorIOConfig.java          |   2 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  93 ++++++++++--
 .../autoscaler/CostBasedAutoScalerConfig.java      |   2 +-
 .../autoscaler/WeightedCostFunction.java           |  67 ++++++---
 .../SeekableStreamSupervisorIOConfigTest.java      |   2 +-
 ...treamSupervisorScaleDuringTaskRolloverTest.java |  19 +--
 .../SeekableStreamSupervisorSpecTest.java          |   4 +-
 .../autoscaler/CostBasedAutoScalerTest.java        | 164 ++++++++++++++++++---
 .../autoscaler/WeightedCostFunctionTest.java       | 134 ++++++++++-------
 10 files changed, 377 insertions(+), 124 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index c5d3ffe873d..40049967b2b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3403,7 +3403,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         group.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
         pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
 
-
         boolean endOffsetsAreInvalid = false;
         for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
           if (entry.getValue().equals(getEndOfPartitionMarker())) {
@@ -3453,17 +3452,26 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * This method is invoked to determine whether a task count adjustment is 
needed
    * during a task rollover based on the recommendations from the task 
auto-scaler.
  */
+
   @VisibleForTesting
   void maybeScaleDuringTaskRollover()
   {
     if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
       int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
-      if (rolloverTaskCount > 0) {
+      if (rolloverTaskCount > 0 && rolloverTaskCount != 
getIoConfig().getTaskCount()) {
         log.info("Autoscaler recommends scaling down to [%d] tasks during 
rollover", rolloverTaskCount);
         changeTaskCountInIOConfig(rolloverTaskCount);
         // Here force reset the supervisor state to be re-calculated on the 
next iteration of runInternal() call.
         // This seems the best way to inject task amount recalculation during 
the rollover.
         clearAllocationInfo();
+
+        ServiceMetricEvent.Builder event = ServiceMetricEvent
+            .builder()
+            .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
+            .setDimension(DruidMetrics.DATASOURCE, dataSource)
+            .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
+
+        emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
rolloverTaskCount));
       }
     }
   }
@@ -4048,7 +4056,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           builder.put(partitionId, makeSequenceNumber(sequence, 
useExclusiveStartSequenceNumberForNonFirstSequence()));
         }
       } else {
-        // if we don't have a startingOffset (first run or we had some 
previous failures and reset the sequences) then
+        // if we don't have a startingOffset (first run, or we had some 
previous failures and reset the sequences) then
         // get the sequence from metadata storage (if available) or 
Kafka/Kinesis (otherwise)
         OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = 
getOffsetFromStorageForPartition(
             partitionId,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 8a7fcad1511..0de06a63949 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -86,7 +86,7 @@ public abstract class SeekableStreamSupervisorIOConfig
     // Could be null
     this.autoScalerConfig = autoScalerConfig;
     this.autoScalerEnabled = autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler();
-    // if autoscaler is enabled then taskCount will be ignored here and 
initial taskCount will equal to taskCountStart/taskCountMin
+    // if autoscaler is enabled, then taskCount will be ignored here and 
initial taskCount will equal to taskCountStart/taskCountMin
     if (autoScalerEnabled) {
       final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
       this.taskCount = startTaskCount != null ? startTaskCount : 
autoScalerConfig.getTaskCountMin();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 598a56e41d1..350e2284359 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
+import it.unimi.dsi.fastutil.ints.IntArraySet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -33,9 +35,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -57,6 +57,25 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
   private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+  /**
+   * Defines the step size used for evaluating lag when computing scaling 
actions.
+   * This constant helps control the granularity of lag considerations in 
scaling decisions,
+   * ensuring smoother transitions between scaled states and avoiding abrupt 
changes in task counts.
+   */
+  private static final int LAG_STEP = 100_000;
+  /**
+   * This parameter fine-tunes autoscaling behavior by adding extra flexibility
+   * when calculating maximum allowable partitions per task in response to lag,
+   * which must be processed as fast, as possible.
+   * It acts as a foundational factor that balances the responsiveness and 
stability of autoscaling.
+   */
+  private static final int BASE_RAW_EXTRA = 5;
+  // Base PPT lag threshold allowing to activate a burst scaleup to eliminate 
high lag.
+  static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
+  // Extra PPT lag threshold allowing activation of even more aggressive 
scaleup to eliminate high lag,
+  // also enabling lag-amplified idle calculation decay in the cost function 
(to reduce idle weight).
+  static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000;
+
   public static final String LAG_COST_METRIC = 
"task/autoScaler/costBased/lagCost";
   public static final String IDLE_COST_METRIC = 
"task/autoScaler/costBased/idleCost";
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
@@ -160,9 +179,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    * Returns -1 (no scaling needed) in the following cases:
    * <ul>
    *   <li>Metrics are not available</li>
-   *   <li>Task count already optimal</li>
-   *   <li>The current idle ratio is in the ideal range and lag considered 
low</li>
-   *   <li>Optimal task count equals current task count</li>
+   *   <li>Current task count already optimal</li>
    * </ul>
    *
    * @return optimal task count for scale-up, or -1 if no scaling action needed
@@ -180,7 +197,12 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       return -1;
     }
 
-    final int[] validTaskCounts = 
CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount);
+    final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
+        partitionCount,
+        currentTaskCount,
+        (long) metrics.getAggregateLag(),
+        config.getTaskCountMax()
+    );
 
     if (validTaskCounts.length == 0) {
       log.warn("No valid task counts after applying constraints for 
supervisorId [%s]", supervisorId);
@@ -230,22 +252,35 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   }
 
   /**
-   * Generates valid task counts based on partitions-per-task ratios.
+   * Generates valid task counts based on partitions-per-task ratios and 
lag-driven PPT relaxation.
    * This enables gradual scaling and avoids large jumps.
    * Limits the range of task counts considered to avoid excessive computation.
    *
    * @return sorted list of valid task counts within bounds
    */
-  static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+  static int[] computeValidTaskCounts(
+      int partitionCount,
+      int currentTaskCount,
+      double aggregateLag,
+      int taskCountMax
+  )
   {
-    if (partitionCount <= 0) {
+    if (partitionCount <= 0 || currentTaskCount <= 0) {
       return new int[]{};
     }
 
-    Set<Integer> result = new HashSet<>();
+    IntSet result = new IntArraySet();
     final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+    final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+        aggregateLag,
+        partitionCount,
+        currentTaskCount,
+        taskCountMax
+    );
+    final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
+
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
-    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
effectiveMaxIncrease);
     final int maxPartitionsPerTask = Math.min(
         partitionCount,
         currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
@@ -253,9 +288,41 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
-      result.add(taskCount);
+      if (taskCount <= taskCountMax) {
+        result.add(taskCount);
+      }
     }
-    return result.stream().mapToInt(Integer::intValue).toArray();
+    return result.toIntArray();
+  }
+
+  /**
+   * Computes extra allowed increase in partitions-per-task in scenarios when 
the average per-partition lag
+   * is above the configured threshold. By default, it is {@code 
EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
+   * Generally, one of the autoscaler priorities is to keep the lag as close 
to zero as possible.
+   */
+  static int computeExtraMaxPartitionsPerTaskIncrease(
+      double aggregateLag,
+      int partitionCount,
+      int currentTaskCount,
+      int taskCountMax
+  )
+  {
+    if (partitionCount <= 0 || taskCountMax <= 0) {
+      return 0;
+    }
+
+    final double lagPerPartition = aggregateLag / partitionCount;
+    if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+      return 0;
+    }
+
+    int rawExtra = BASE_RAW_EXTRA;
+    if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+      rawExtra += (int) ((lagPerPartition - 
AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP);
+    }
+
+    final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount 
/ taskCountMax);
+    return (int) (rawExtra * headroomRatio);
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index d1f682f4f56..aba26ba25b5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -52,7 +52,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   private final boolean enableTaskAutoScaler;
   private final int taskCountMax;
   private final int taskCountMin;
-  private final Integer taskCountStart;
+  private Integer taskCountStart;
   private final long minTriggerScaleActionFrequencyMillis;
   private final Double stopTaskCountRatio;
   private final long scaleActionPeriodMillis;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 0da733ef9e7..8a375955691 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -29,24 +29,25 @@ import org.apache.druid.java.util.common.logger.Logger;
 public class WeightedCostFunction
 {
   private static final Logger log = new Logger(WeightedCostFunction.class);
-
-
   /**
-   * Ideal idle ratio range boundaries.
-   * Idle ratio below MIN indicates tasks are overloaded (scale up needed).
-   * Idle ratio above MAX indicates tasks are underutilized (scale down 
needed).
+   * Represents the maximum multiplier factor applied to amplify lag-based 
costs in the cost computation process.
+   * This value is used to cap the lag amplification effect to prevent 
excessively high cost inflation
+   * caused by significant partition lag.
+   * It ensures that lag-related adjustments remain bounded within a 
reasonable range for stability of
+   * cost-based auto-scaling decisions.
    */
-  static final double IDEAL_IDLE_MIN = 0.2;
-  static final double IDEAL_IDLE_MAX = 0.6;
-
+  private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0;
+  private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L;
   /**
-   * Checks if the given idle ratio is within the ideal range [{@value 
#IDEAL_IDLE_MIN}, {@value #IDEAL_IDLE_MAX}].
-   * When idle is in this range, optimal utilization has been achieved and no 
scaling is needed.
+   * It is used to calculate the denominator for the ramp formula in the cost
+   * computation logic. This value represents the difference between the 
maximum lag per
+   * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling 
activation
+   * lag threshold 
(CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD).
+   * <p>
+   * It is impacting how the cost model evaluates scaling decisions during 
high-lag sceario.
    */
-  public static boolean isIdleInIdealRange(double idleRatio)
-  {
-    return idleRatio >= IDEAL_IDLE_MIN && idleRatio <= IDEAL_IDLE_MAX;
-  }
+  private static final double RAMP_DENOMINATOR =
+      LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
 
   /**
    * Computes cost for a given task count using compute time metrics.
@@ -104,12 +105,15 @@ public class WeightedCostFunction
     return new CostResult(cost, lagCost, weightedIdleCost);
   }
 
-
   /**
-   * Estimates the idle ratio for a given task count using a capacity-based 
linear model.
+   * Estimates the idle ratio for a proposed task count.
+   * Includes lag-based adjustment to eliminate high lag and
+   * reduce predicted idle when work exists.
    * <p>
-   * Formula: {@code predictedIdle = 1 - busyFraction / taskRatio}
-   * where {@code busyFraction = 1 - currentIdleRatio} and {@code taskRatio = 
targetTaskCount / currentTaskCount}.
+   * Formulas:
+   * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)}
+   * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)}
+   * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)}
    *
    * @param metrics   current system metrics containing idle ratio and task 
count
    * @param taskCount target task count to estimate an idle ratio for
@@ -119,7 +123,6 @@ public class WeightedCostFunction
   {
     final double currentPollIdleRatio = metrics.getPollIdleRatio();
 
-    // Handle edge cases
     if (currentPollIdleRatio < 0) {
       // No idle data available, assume moderate idle
       return 0.5;
@@ -130,13 +133,33 @@ public class WeightedCostFunction
       return currentPollIdleRatio;
     }
 
-    // Capacity-based model: idle ratio reflects spare capacity per task
+    // Linear prediction (capacity-based) - existing logic
     final double busyFraction = 1.0 - currentPollIdleRatio;
     final double taskRatio = (double) taskCount / currentTaskCount;
-    final double predictedIdleRatio = 1.0 - busyFraction / taskRatio;
+    final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - 
busyFraction / taskRatio));
+
+    // Lag-based adjustment: more work per task → less idle
+    final double lagPerTask = metrics.getAggregateLag() / taskCount;
+    double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / 
CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD);
+    final int partitionCount = metrics.getPartitionCount();
+
+    if (partitionCount > 0) {
+      final double lagPerPartition = metrics.getAggregateLag() / 
partitionCount;
+      // Lag-amplified idle decay
+      if (lagPerPartition >= 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
+        double ramp = Math.max(0.0,
+                               (lagPerPartition - 
CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD)
+                               / RAMP_DENOMINATOR
+        );
+        ramp = Math.min(1.0, ramp);
+
+        final double multiplier = 1.0 + ramp * 
(LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0);
+        lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier);
+      }
+    }
 
     // Clamp to valid range [0, 1]
-    return Math.max(0.0, Math.min(1.0, predictedIdleRatio));
+    return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor));
   }
 
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index a97e971028c..56b8904cc18 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -80,7 +80,7 @@ public class SeekableStreamSupervisorIOConfigTest
   }
 
   @Test
-  public void testAutoScalerEnabledTrueAndFalse()
+  public void testAutoScalerEnabledPreservesTaskCountWhenNonNull()
   {
     LagAggregator lagAggregator = mock(LagAggregator.class);
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index 2c4275a99ec..b39e6097903 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -61,11 +61,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     supervisor.maybeScaleDuringTaskRollover();
 
     // Then
-    Assert.assertEquals(
-        "Task count should not change when taskAutoScaler is null",
-        beforeTaskCount,
-        (int) supervisor.getIoConfig().getTaskCount()
-    );
+    Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig());
   }
 
   @Test
@@ -88,6 +84,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     supervisor.maybeScaleDuringTaskRollover();
 
     // Then
+    Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
     Assert.assertEquals(
         "Task count should not change when rolloverTaskCount <= 0",
         beforeTaskCount,
@@ -111,12 +108,11 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     supervisor.start();
     supervisor.createAutoscaler(spec);
 
-    Assert.assertEquals(DEFAULT_TASK_COUNT, (int) 
supervisor.getIoConfig().getTaskCount());
-
     // When
     supervisor.maybeScaleDuringTaskRollover();
 
     // Then
+    Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
     Assert.assertEquals(
         "Task count should be updated to " + targetTaskCount + " when 
rolloverTaskCount > 0",
         targetTaskCount,
@@ -144,6 +140,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     supervisor.maybeScaleDuringTaskRollover();
 
     // Then
+    Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
     Assert.assertEquals(
         "Task count should not change when rolloverTaskCount is 0",
         beforeTaskCount,
@@ -201,13 +198,11 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
   {
     return CostBasedAutoScalerConfig.builder()
+                                    .enableTaskAutoScaler(true)
                                     .taskCountMax(100)
                                     .taskCountMin(1)
-                                    .taskCountStart(DEFAULT_TASK_COUNT)
-                                    .enableTaskAutoScaler(true)
-                                    .lagWeight(0.25)
-                                    .idleWeight(0.75)
-                                    .scaleActionPeriodMillis(100)
+                                    .taskCountStart(1)
+                                    .scaleActionPeriodMillis(60000)
                                     .build();
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 7df22c467a1..db805c21187 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -1457,7 +1457,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
           "stream",
           new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
           1,
-          taskCount,
+          null, // autoscaler uses taskCountStart/taskCountMin for the initial 
value
           new Period("PT1H"),
           new Period("P1D"),
           new Period("PT30S"),
@@ -1478,7 +1478,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
           "stream",
           new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
           1,
-          taskCount,
+          null, // autoscaler uses taskCountStart/taskCountMin for the initial 
value
           new Period("PT1H"),
           new Period("P1D"),
           new Period("PT30S"),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index caf5453f521..54299d915f6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -30,6 +30,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,11 +38,16 @@ import java.util.Map;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
 import static 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease;
+import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("SameParameterValue")
 public class CostBasedAutoScalerTest
 {
   private CostBasedAutoScaler autoScaler;
+  private CostBasedAutoScalerConfig config;
 
   @Before
   public void setUp()
@@ -55,13 +61,13 @@ public class CostBasedAutoScalerTest
     when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
     when(mockIoConfig.getStream()).thenReturn("test-stream");
 
-    CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
-                                                                
.taskCountMax(100)
-                                                                
.taskCountMin(1)
-                                                                
.enableTaskAutoScaler(true)
-                                                                .lagWeight(0.6)
-                                                                
.idleWeight(0.4)
-                                                                .build();
+    config = CostBasedAutoScalerConfig.builder()
+                                      .taskCountMax(100)
+                                      .taskCountMin(1)
+                                      .enableTaskAutoScaler(true)
+                                      .lagWeight(0.6)
+                                      .idleWeight(0.4)
+                                      .build();
 
     autoScaler = new CostBasedAutoScaler(mockSupervisor, config, 
mockSupervisorSpec, mockEmitter);
   }
@@ -70,26 +76,122 @@ public class CostBasedAutoScalerTest
   public void testComputeValidTaskCounts()
   {
     // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
-    int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 
25);
+    int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100);
     Assert.assertTrue("Should contain the current task count", 
contains(validTaskCounts, 25));
     Assert.assertTrue("Should contain the next scale-up option", 
contains(validTaskCounts, 34));
 
     // Edge cases
-    Assert.assertEquals("Zero partitions return empty array", 0, 
CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
-    Assert.assertEquals("Negative partitions return empty array", 0, 
CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length);
+    Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length);
+    Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length);
 
     // Single partition
-    int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1);
+    int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100);
     Assert.assertTrue("Single partition should have at least one valid count", 
singlePartition.length > 0);
     Assert.assertTrue("Single partition should contain 1", 
contains(singlePartition, 1));
 
     // Current exceeds partitions - should still yield valid, deduplicated 
options
-    int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5);
+    int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100);
     Assert.assertEquals(2, exceedsPartitions.length);
     Assert.assertTrue(contains(exceedsPartitions, 1));
     Assert.assertTrue(contains(exceedsPartitions, 2));
   }
 
+  @Test
+  public void testComputeValidTaskCountsLagExpansion()
+  {
+    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30);
+    Assert.assertFalse("Low lag should not include max task count", 
contains(lowLagCounts, 30));
+    Assert.assertTrue("Low lag should cap scale up around 4 tasks", 
contains(lowLagCounts, 4));
+
+    long highAggregateLag = 30L * 500_000L;
+    int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30);
+    Assert.assertTrue("High lag should allow scaling to max tasks", 
contains(highLagCounts, 30));
+  }
+
+  @Test
+  public void testComputeValidTaskCountsRespectsTaskCountMax()
+  {
+    long highAggregateLag = 30L * 500_000L;
+    int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3);
+    Assert.assertTrue("Should include taskCountMax when doable", 
contains(cappedCounts, 3));
+    Assert.assertFalse("Should not exceed taskCountMax", 
contains(cappedCounts, 4));
+  }
+
+  @Test
+  public void testScalingExamplesTable()
+  {
+    int partitionCount = 30;
+    int taskCountMax = 30;
+    double pollIdleRatio = 0.1;
+    double avgProcessingRate = 10.0;
+
+    class Example
+    {
+      final int currentTasks;
+      final long lagPerPartition;
+      final int expectedTasks;
+
+      Example(int currentTasks, long lagPerPartition, int expectedTasks)
+      {
+        this.currentTasks = currentTasks;
+        this.lagPerPartition = lagPerPartition;
+        this.expectedTasks = expectedTasks;
+      }
+    }
+
+    Example[] examples = new Example[]{
+        new Example(3, 50_000L, 8),
+        new Example(3, 300_000L, 15),
+        new Example(3, 500_000L, 30),
+        new Example(10, 100_000L, 15),
+        new Example(10, 300_000L, 30),
+        new Example(10, 500_000L, 30),
+        new Example(20, 500_000L, 30),
+        new Example(25, 500_000L, 30)
+    };
+
+    for (Example example : examples) {
+      long aggregateLag = example.lagPerPartition * partitionCount;
+      int[] validCounts = computeValidTaskCounts(partitionCount, 
example.currentTasks, aggregateLag, taskCountMax);
+      Assert.assertTrue(
+          "Should include expected task count for current=" + 
example.currentTasks + ", lag=" + example.lagPerPartition,
+          contains(validCounts, example.expectedTasks)
+      );
+
+      CostMetrics metrics = createMetricsWithRate(
+          example.lagPerPartition,
+          example.currentTasks,
+          partitionCount,
+          pollIdleRatio,
+          avgProcessingRate
+      );
+      int actualOptimal = autoScaler.computeOptimalTaskCount(metrics);
+      if (actualOptimal == -1) {
+        actualOptimal = example.currentTasks;
+      }
+      Assert.assertEquals(
+          "Optimal task count should match for current=" + example.currentTasks
+          + ", lag=" + example.lagPerPartition
+          + ", valid=" + Arrays.toString(validCounts),
+          example.expectedTasks,
+          actualOptimal
+      );
+    }
+  }
+
+  @Test
+  public void testComputeExtraPPTIncrease()
+  {
+    // No extra increase below the threshold
+    Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 
49_000L, 30, 3, 30));
+    Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * 
EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30));
+
+    // More aggressive increase when the lag is high
+    Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L * 
300_000L, 30, 3, 30));
+    // Zero when on max task count
+    Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 
500_000L, 30, 30, 30));
+  }
+
   @Test
   public void testComputeOptimalTaskCountInvalidInputs()
   {
@@ -110,7 +212,7 @@ public class CostBasedAutoScalerTest
     int highIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
     Assert.assertTrue("Scale down scenario should return optimal <= current", 
highIdleResult <= 50);
 
-    // With low idle and balanced weights, algorithm should not scale up 
aggressively
+    // With low idle and balanced weights, the algorithm should not scale up 
aggressively
     int lowIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
     Assert.assertTrue("With low idle and balanced weights, should not scale up 
aggressively", lowIdleResult <= 25);
   }
@@ -194,10 +296,16 @@ public class CostBasedAutoScalerTest
   {
     // 15-minute average is preferred
     Map<String, Map<String, Object>> fifteenMin = new HashMap<>();
-    fifteenMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
+    fifteenMin.put(
+        "0",
+        Collections.singletonMap(
+            "task-0",
+            buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 
1500.0)
+        )
+    );
     Assert.assertEquals(1500.0, 
CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
 
-    // 1-minute as final fallback
+    // 1-minute as a final fallback
     Map<String, Map<String, Object>> oneMin = new HashMap<>();
     oneMin.put("0", Collections.singletonMap("task-0", 
buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
     Assert.assertEquals(500.0, 
CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
@@ -328,7 +436,7 @@ public class CostBasedAutoScalerTest
   @Test
   public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
   {
-    // Tests the case where lastKnownMetrics is null (no 
computeTaskCountForScaleAction called)
+    // Tests the case where the lastKnownMetrics is null (no 
computeTaskCountForScaleAction called)
     SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
     SeekableStreamSupervisor supervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
     ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
@@ -369,6 +477,24 @@ public class CostBasedAutoScalerTest
     );
   }
 
+  private CostMetrics createMetricsWithRate(
+      double avgPartitionLag,
+      int currentTaskCount,
+      int partitionCount,
+      double pollIdleRatio,
+      double avgProcessingRate
+  )
+  {
+    return new CostMetrics(
+        avgPartitionLag,
+        currentTaskCount,
+        partitionCount,
+        pollIdleRatio,
+        3600,
+        avgProcessingRate
+    );
+  }
+
   private boolean contains(int[] array, int value)
   {
     for (int i : array) {
@@ -434,7 +560,11 @@ public class CostBasedAutoScalerTest
     return taskStats;
   }
 
-  private Map<String, Object> buildTaskStatsWithNullInterval(String 
nullInterval, String validInterval, double processedRate)
+  private Map<String, Object> buildTaskStatsWithNullInterval(
+      String nullInterval,
+      String validInterval,
+      double processedRate
+  )
   {
     Map<String, Object> buildSegments = new HashMap<>();
     buildSegments.put(nullInterval, null);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 90b50477c92..416def7e3ab 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -51,20 +51,24 @@ public class WeightedCostFunctionTest
     Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0);
     Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0);
     Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0);
-    Assert.assertEquals(Double.POSITIVE_INFINITY, 
costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, 
config).totalCost(), 0.0);
+    Assert.assertEquals(
+        Double.POSITIVE_INFINITY,
+        costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, 
config).totalCost(),
+        0.0
+    );
   }
 
   @Test
   public void testScaleDownHasHigherLagCostThanCurrent()
   {
     CostBasedAutoScalerConfig lagOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(1.0)
-        .idleWeight(0.0)
-        .defaultProcessingRate(100.0)
-        .build();
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.lagWeight(1.0)
+                                                                       
.idleWeight(0.0)
+                                                                       
.defaultProcessingRate(100.0)
+                                                                       
.build();
 
     CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
 
@@ -85,13 +89,13 @@ public class WeightedCostFunctionTest
     // With lag-only config (no idle penalty), the marginal model is used for 
scale-up:
     // lagRecoveryTime = aggregateLag / (taskCountDiff * rate)
     CostBasedAutoScalerConfig lagOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(1.0)
-        .idleWeight(0.0)
-        .defaultProcessingRate(1000.0)
-        .build();
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.lagWeight(1.0)
+                                                                       
.idleWeight(0.0)
+                                                                       
.defaultProcessingRate(1000.0)
+                                                                       
.build();
 
     // aggregateLag = 100000 * 100 = 10,000,000
     CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
@@ -113,18 +117,13 @@ public class WeightedCostFunctionTest
   }
 
   @Test
-  public void testBalancedWeightsFavorStabilityOverScaleUp()
+  public void testBalancedWeightsFavorStabilityOverScaleUpOnSmallLag()
   {
-    // With the marginal lag model and corrected idle ratio, balanced weights
-    // favor stability because idle cost increases significantly with more 
tasks
-    // This is intentional behavior: the algorithm is conservative about 
scale-up.
-    CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
-
+    // Validate idle ratio estimation and ensure balanced weights still favor 
stability.
+    CostMetrics metrics = createMetrics(100.0, 10, 100, 0.3);
     double costCurrent = costFunction.computeCost(metrics, 10, 
config).totalCost();
     double costScaleUp = costFunction.computeCost(metrics, 20, 
config).totalCost();
 
-    // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from
-    // scaling up dominates the lag recovery benefit
     Assert.assertTrue(
         "With balanced weights, staying at current count is cheaper than 
scale-up",
         costCurrent < costScaleUp
@@ -193,13 +192,13 @@ public class WeightedCostFunctionTest
 
     // Use lag-only config to isolate the lag recovery time component
     CostBasedAutoScalerConfig lagOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(1.0)
-        .idleWeight(0.0)
-        .defaultProcessingRate(1000.0)
-        .build();
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.lagWeight(1.0)
+                                                                       
.idleWeight(0.0)
+                                                                       
.defaultProcessingRate(1000.0)
+                                                                       
.build();
 
     double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount 
+ 5, lagOnlyConfig).totalCost();
     double costDown5 = costFunction.computeCost(metricsNoRate, 
currentTaskCount - 5, lagOnlyConfig).totalCost();
@@ -218,13 +217,13 @@ public class WeightedCostFunctionTest
     // Test that idle cost increases monotonically with task count.
     // With fixed load, adding more tasks means each task has less work, so 
idle increases.
     CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(0.0)
-        .idleWeight(1.0)
-        .defaultProcessingRate(1000.0)
-        .build();
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.lagWeight(0.0)
+                                                                        
.idleWeight(1.0)
+                                                                        
.defaultProcessingRate(1000.0)
+                                                                        
.build();
 
     // Current: 10 tasks with 40% idle (60% busy)
     CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
@@ -244,13 +243,13 @@ public class WeightedCostFunctionTest
   public void testIdleRatioClampingAtBoundaries()
   {
     CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(0.0)
-        .idleWeight(1.0)
-        .defaultProcessingRate(1000.0)
-        .build();
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.lagWeight(0.0)
+                                                                        
.idleWeight(1.0)
+                                                                        
.defaultProcessingRate(1000.0)
+                                                                        
.build();
 
     // Extreme scale-down: 10 tasks → 2 tasks with 40% idle
     // busyFraction = 0.6, taskRatio = 0.2
@@ -275,13 +274,13 @@ public class WeightedCostFunctionTest
   public void testIdleRatioWithMissingData()
   {
     CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
-        .taskCountMax(100)
-        .taskCountMin(1)
-        .enableTaskAutoScaler(true)
-        .lagWeight(0.0)
-        .idleWeight(1.0)
-        .defaultProcessingRate(1000.0)
-        .build();
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.lagWeight(0.0)
+                                                                        
.idleWeight(1.0)
+                                                                        
.defaultProcessingRate(1000.0)
+                                                                        
.build();
 
     // Negative idle ratio indicates missing data → should default to 0.5
     CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0);
@@ -296,7 +295,38 @@ public class WeightedCostFunctionTest
     Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 * 
0.5, cost20, 0.0001);
   }
 
-  private CostMetrics createMetrics(double avgPartitionLag, int 
currentTaskCount, int partitionCount, double pollIdleRatio)
+  @Test
+  public void testLagAmplificationReducesIdleUnderHighLag()
+  {
+    CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                        
.taskCountMax(100)
+                                                                        
.taskCountMin(1)
+                                                                        
.enableTaskAutoScaler(true)
+                                                                        
.defaultProcessingRate(1000.0)
+                                                                        
.build();
+
+    int currentTaskCount = 3;
+    int proposedTaskCount = 8;
+    int partitionCount = 30;
+    double pollIdleRatio = 0.1;
+
+    CostMetrics lowLag = createMetrics(40_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+    CostMetrics highLag = createMetrics(500_000.0, currentTaskCount, 
partitionCount, pollIdleRatio);
+
+    double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, 
idleOnlyConfig).totalCost();
+    double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, 
idleOnlyConfig).totalCost();
+    Assert.assertTrue(
+        "Higher lag should reduce predicted idle more aggressively",
+        lowLagCost > highLagCost
+    );
+  }
+
+  private CostMetrics createMetrics(
+      double avgPartitionLag,
+      int currentTaskCount,
+      int partitionCount,
+      double pollIdleRatio
+  )
   {
     return new CostMetrics(
         avgPartitionLag,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to