kfaraz commented on code in PR #18819:
URL: https://github.com/apache/druid/pull/18819#discussion_r2605199325


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    log.info(
+        "CostBasedAutoScaler started for dataSource [%s]: collecting metrics 
every [%d]ms, "

Review Comment:
   For all the log lines in this class, please use `supervisorId` instead of 
`dataSource`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    log.info(
+        "CostBasedAutoScaler started for dataSource [%s]: collecting metrics 
every [%d]ms, "
+        + "evaluating scaling every [%d]ms",
+        dataSource,
+        config.getMetricsCollectionIntervalMillis(),
+        config.getScaleActionPeriodMillis()
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    scalingDecisionExec.shutdownNow();
+    metricsCollectionExec.shutdownNow();
+    log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource);
+  }
+
+  @Override
+  public void reset()
+  {
+    currentMetrics.set(null);
+  }
+
+  private Runnable collectMetrics()
+  {
+    return () -> {
+      if (spec.isSuspended()) {
+        log.debug("Supervisor [%s] is suspended, skipping metrics collection", 
dataSource);
+        return;
+      }
+
+      final LagStats lagStats = supervisor.computeLagStats();
+      if (lagStats == null) {
+        log.debug("Lag stats unavailable for dataSource [%s], skipping 
collection", dataSource);
+        return;
+      }
+
+      final int currentTaskCount = supervisor.getActiveTaskGroupsCount();
+      final int partitionCount = supervisor.getPartitionCount();
+      final double avgPartitionLag = partitionCount > 0
+                                     ? (double) lagStats.getTotalLag() / 
partitionCount
+                                     : 0.0;
+      final double pollIdleRatio = supervisor.getPollIdleRatioMetric();
+
+      currentMetrics.compareAndSet(
+          null,
+          new CostMetrics(
+              System.currentTimeMillis(),
+              avgPartitionLag,
+              currentTaskCount,
+              partitionCount,
+              pollIdleRatio
+          )
+      );
+
+      log.debug("Collected metrics for dataSource [%s]", dataSource);
+    };
+  }
+
+  /**
+   * @return optimal task count, or -1 if no scaling action needed
+   */
+  public int computeOptimalTaskCount(AtomicReference<CostMetrics> 
currentMetricsRef)
+  {
+    final CostMetrics currentMetrics = currentMetricsRef.get();
+    if (currentMetrics == null) {
+      log.debug("No metrics available yet for dataSource [%s]", dataSource);
+      return -1;
+    }
+
+    if (currentMetrics.getPartitionCount() <= 0 || 
currentMetrics.getCurrentTaskCount() <= 0) {
+      return -1;
+    }
+
+    final int currentTaskCount = currentMetrics.getCurrentTaskCount();
+    final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent(

Review Comment:
   Is the `computeFactors()` computation really heavy enough to require caching?
   Especially since we are imposing a max `SCALE_FACTOR_DISCRETE_DISTANCE`.
   
   How about we simplify `computeFactors` so that we compute only the required 
factors?
   
   Example:
   
   ```java
   List<Integer> computeValidTaskCounts(int partitionCount, int 
currentTaskCount)
   {
      final int currentPartitionsPerTask = partitionCount / currentTaskCount;
      final int minPartitionsPerTask = Math.min(1, currentPartitionsPerTask - 
2);
      final int maxPartitionsPerTask = Math.max(partitionCount, 
currentPartitionsPerTask + 2);
   
      return IntStream.of(minPartitionPerTask, maxPartitionsPerTask + 1)
                                 .map(partitionsPerTask -> (partitionCount / 
partitionsPerTask) + Math.min(partitionCount % partitionsPerTask, 1))
                                 .collect(Collectors.toList();
   }
   ```
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    log.info(
+        "CostBasedAutoScaler started for dataSource [%s]: collecting metrics 
every [%d]ms, "
+        + "evaluating scaling every [%d]ms",
+        dataSource,
+        config.getMetricsCollectionIntervalMillis(),
+        config.getScaleActionPeriodMillis()
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    scalingDecisionExec.shutdownNow();
+    metricsCollectionExec.shutdownNow();
+    log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource);
+  }
+
+  @Override
+  public void reset()
+  {
+    currentMetrics.set(null);
+  }
+
+  private Runnable collectMetrics()

Review Comment:
   Nit: This class may just return `void` and not a `Runnable`. If we need a 
`Runnable`, we can just the lambda `this::collectMetrics`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,

Review Comment:
   Let's not pass this as an arg, we can always get it from the spec.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;

Review Comment:
   Use supervisor ID instead, since there may be multiple supervisors ingesting 
into the same datasource.
   
   ```suggestion
     private final String supervisorId;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Configuration for cost-based auto-scaling of seekable stream supervisor 
tasks.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are constrained to be factors/divisors of the partition count.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CostBasedAutoScalerConfig implements AutoScalerConfig
+{
+  private static final long DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS = 60 * 
1000; // 1 minute
+  private static final long DEFAULT_METRICS_COLLECTION_RANGE_MILLIS = 10 * 60 
* 1000; // 10 minutes
+  private static final long DEFAULT_SCALE_ACTION_START_DELAY_MILLIS = 5 * 60 * 
1000; // 5 minutes
+  private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 
1000; // 10 minutes
+  private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS 
= 1200000; // 20 minutes
+  private static final double DEFAULT_LAG_WEIGHT = 0.25;
+  private static final double DEFAULT_IDLE_WEIGHT = 0.75;
+
+  private final boolean enableTaskAutoScaler;
+  private final int taskCountMax;
+  private final int taskCountMin;
+  private Integer taskCountStart;
+  private final long minTriggerScaleActionFrequencyMillis;
+  private final Double stopTaskCountRatio;
+
+  private final long metricsCollectionIntervalMillis;
+  private final long metricsCollectionRangeMillis;
+  private final long scaleActionStartDelayMillis;
+  private final long scaleActionPeriodMillis;
+
+  private final double lagWeight;
+  private final double idleWeight;
+
+  @JsonCreator
+  public CostBasedAutoScalerConfig(
+      @JsonProperty("taskCountMax") Integer taskCountMax,
+      @JsonProperty("taskCountMin") Integer taskCountMin,
+      @Nullable @JsonProperty("enableTaskAutoScaler") Boolean 
enableTaskAutoScaler,
+      @Nullable @JsonProperty("taskCountStart") Integer taskCountStart,
+      @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long 
minTriggerScaleActionFrequencyMillis,
+      @Nullable @JsonProperty("stopTaskCountRatio") Double stopTaskCountRatio,
+      @Nullable @JsonProperty("metricsCollectionIntervalMillis") Long 
metricsCollectionIntervalMillis,
+      @Nullable @JsonProperty("metricsCollectionRangeMillis") Long 
metricsCollectionRangeMillis,
+      @Nullable @JsonProperty("scaleActionStartDelayMillis") Long 
scaleActionStartDelayMillis,
+      @Nullable @JsonProperty("scaleActionPeriodMillis") Long 
scaleActionPeriodMillis,
+      @Nullable @JsonProperty("lagWeight") Double lagWeight,
+      @Nullable @JsonProperty("idleWeight") Double idleWeight
+  )
+  {
+    this.enableTaskAutoScaler = enableTaskAutoScaler != null ? 
enableTaskAutoScaler : false;
+
+    // Timing configuration with defaults
+    this.metricsCollectionIntervalMillis = metricsCollectionIntervalMillis != 
null
+                                           ? metricsCollectionIntervalMillis
+                                           : 
DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS;
+    this.metricsCollectionRangeMillis = metricsCollectionRangeMillis != null
+                                        ? metricsCollectionRangeMillis
+                                        : 
DEFAULT_METRICS_COLLECTION_RANGE_MILLIS;
+    this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null
+                                       ? scaleActionStartDelayMillis
+                                       : 
DEFAULT_SCALE_ACTION_START_DELAY_MILLIS;
+    this.scaleActionPeriodMillis = scaleActionPeriodMillis != null
+                                   ? scaleActionPeriodMillis
+                                   : DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+    this.minTriggerScaleActionFrequencyMillis = 
minTriggerScaleActionFrequencyMillis != null
+                                                ? 
minTriggerScaleActionFrequencyMillis
+                                                : 
DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
+
+    // Cost function weights with defaults
+    this.lagWeight = lagWeight != null ? lagWeight : DEFAULT_LAG_WEIGHT;

Review Comment:
   Use utility method for brevity
   ```suggestion
       this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java:
##########
@@ -47,6 +48,8 @@ public class KafkaConsumerMonitor extends AbstractMonitor
   private static final String PARTITION_TAG = "partition";
   private static final String NODE_ID_TAG = "node-id";
 
+  private static final String POLL_IDLE_RATION_METRIC_NAME = 
"poll-idle-ratio-avg";

Review Comment:
   ```suggestion
     private static final String POLL_IDLE_RATIO_METRIC_NAME = 
"poll-idle-ratio-avg";
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();

Review Comment:
   Entries are never removed from this cache and it may keep growing up.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS

Review Comment:
   I have rarely seen anyone configuring these values on the lag-based 
auto-scaler.
   I would suggest we do not add these configs right now and start with 
standard values (maybe the defaults you are currently using). We can always add 
them as configs if needed later.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    log.info(
+        "CostBasedAutoScaler started for dataSource [%s]: collecting metrics 
every [%d]ms, "
+        + "evaluating scaling every [%d]ms",
+        dataSource,
+        config.getMetricsCollectionIntervalMillis(),
+        config.getScaleActionPeriodMillis()
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    scalingDecisionExec.shutdownNow();
+    metricsCollectionExec.shutdownNow();
+    log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource);
+  }
+
+  @Override
+  public void reset()
+  {
+    currentMetrics.set(null);
+  }
+
+  private Runnable collectMetrics()
+  {
+    return () -> {
+      if (spec.isSuspended()) {
+        log.debug("Supervisor [%s] is suspended, skipping metrics collection", 
dataSource);
+        return;
+      }
+
+      final LagStats lagStats = supervisor.computeLagStats();
+      if (lagStats == null) {
+        log.debug("Lag stats unavailable for dataSource [%s], skipping 
collection", dataSource);
+        return;
+      }
+
+      final int currentTaskCount = supervisor.getActiveTaskGroupsCount();
+      final int partitionCount = supervisor.getPartitionCount();
+      final double avgPartitionLag = partitionCount > 0
+                                     ? (double) lagStats.getTotalLag() / 
partitionCount
+                                     : 0.0;

Review Comment:
   Doesn't `lagStats` already have the average lag pre-computed?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d"
+    );
+
+    this.metricBuilder = ServiceMetricEvent.builder()
+                                           
.setDimension(DruidMetrics.DATASOURCE, dataSource)
+                                           .setDimension(
+                                               DruidMetrics.STREAM,
+                                               
this.supervisor.getIoConfig().getStream()
+                                           );
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> 
computeOptimalTaskCount(currentMetrics);
+    Runnable onSuccessfulScale = () -> currentMetrics.set(null);
+
+    metricsCollectionExec.scheduleAtFixedRate(
+        collectMetrics(),
+        config.getMetricsCollectionIntervalMillis(),
+        config.getMetricsCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    scalingDecisionExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
+        config.getScaleActionStartDelayMillis(),
+        config.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    log.info(
+        "CostBasedAutoScaler started for dataSource [%s]: collecting metrics 
every [%d]ms, "
+        + "evaluating scaling every [%d]ms",
+        dataSource,
+        config.getMetricsCollectionIntervalMillis(),
+        config.getScaleActionPeriodMillis()
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    scalingDecisionExec.shutdownNow();
+    metricsCollectionExec.shutdownNow();
+    log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource);
+  }
+
+  @Override
+  public void reset()
+  {
+    currentMetrics.set(null);
+  }
+
+  private Runnable collectMetrics()
+  {
+    return () -> {
+      if (spec.isSuspended()) {
+        log.debug("Supervisor [%s] is suspended, skipping metrics collection", 
dataSource);
+        return;
+      }
+
+      final LagStats lagStats = supervisor.computeLagStats();
+      if (lagStats == null) {
+        log.debug("Lag stats unavailable for dataSource [%s], skipping 
collection", dataSource);
+        return;
+      }
+
+      final int currentTaskCount = supervisor.getActiveTaskGroupsCount();

Review Comment:
   I see that lag-based auto-scaler does this too, but I am not sure if this is 
correct.
   This only returns the number of tasks that are actively reading. There may 
be tasks which have already moved to publishing. I think the more accurate 
measure would be the actual `ioConfig.taskCount` itself.
   
   I will need to double check this though. Could you please try to check as 
well?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java:
##########
@@ -31,16 +31,28 @@
 @UnstableApi
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", 
defaultImpl = LagBasedAutoScalerConfig.class)
 @JsonSubTypes(value = {
-        @Type(name = "lagBased", value = LagBasedAutoScalerConfig.class)
+    @Type(name = "lagBased", value = LagBasedAutoScalerConfig.class),
+    @Type(name = "costBased", value = CostBasedAutoScalerConfig.class)
 })
 public interface AutoScalerConfig
 {
   boolean getEnableTaskAutoScaler();
+
   long getMinTriggerScaleActionFrequencyMillis();
+
   int getTaskCountMax();
+
   int getTaskCountMin();
+
   Integer getTaskCountStart();
+
+  default void setTaskCountStart(int newTaskCountStart)

Review Comment:
   Please remove this change from this PR. We will tackle it as needed in 
#18745 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java:
##########
@@ -181,4 +192,10 @@ public void stopAfterNextEmit()
   {
     stopAfterNext = true;
   }
+
+  // Use that method in the future as metrics forwarder to supervisor

Review Comment:
   ```suggestion
     /**
      * Average poll-to-idle ratio as reported by the Kafka consumer.
      * A value of 0 represents that the consumer is never idle, i.e. always 
consuming.
      * A value of 1 represents that the consumer is always idle, i.e. not 
receiving data.
      */
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -143,6 +142,15 @@ default Map<PartitionIdType, SequenceOffsetType> 
getLatestSequenceNumbers(Set<St
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * @return Kafka's `poll-idle-ratio-avg` an it's analog for Kinesis,
+   * required for correct autoscaler work
+   */

Review Comment:
   ```suggestion
     /**
      * Average poll-to-idle ratio as reported by the stream consumer.
      * A value of 0 represents that the consumer is never idle, i.e. always 
consuming.
      * A value of 1 represents that the consumer is always idle, i.e. not 
receiving data.
      * Used by the supervisor auto-scaler to find an optimal task count that 
minimizes idle time.
      */
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java:
##########
@@ -172,6 +176,13 @@ public boolean doMonitor(final ServiceEmitter emitter)
           
emitter.emit(builder.setMetric(kafkaConsumerMetric.getDruidMetricName(), 
emitValue));
         }
       }
+
+      // Capture `poll-idle-ratio-avg` metric for autoscaler purposes.
+      if (POLL_IDLE_RATION_METRIC_NAME.equals(metricName.name())) {

Review Comment:
   Since we are capturing it anyway, let's emit it too. Might be useful for 
debugging purposes.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -494,7 +494,7 @@ boolean identifyMultiValuedDimensions()
       boolean isRollupOnMultiValueStringDimension = isPossiblyRollup &&
                                                     
dimensionsSpec.getDimensions()
                                                                   .stream()
-                                                                  
.anyMatch(DimensionSchema::canBeMultiValued);
+                                                                  
.anyMatch(dim -> dim.canBeMultiValued());

Review Comment:
   Please revert this if not needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Cost-based auto-scaler for seekable stream supervisors.
+ * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
+ * Task counts are selected from predefined values (not arbitrary factors).
+ * Scale-up happens incrementally, scale-down only during task rollover.
+ */
+public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(CostBasedAutoScaler.class);
+
+  private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2;
+
+  private static final Map<Integer, List<Integer>> FACTORS_CACHE = new 
HashMap<>();
+
+  private final String dataSource;
+  /**
+   * Atomic reference to CostMetrics object. All operations must be performed
+   * with sequentially consistent semantics (volatile reads/writes).
+   * However, it may be fine-tuned with acquire/release semantics,
+   * but requires careful reasoning about correctness.
+   */
+  private final AtomicReference<CostMetrics> currentMetrics;
+  private final ScheduledExecutorService metricsCollectionExec;
+  private final ScheduledExecutorService scalingDecisionExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final CostBasedAutoScalerConfig config;
+  private final ServiceEmitter emitter;
+  private final ServiceMetricEvent.Builder metricBuilder;
+  private final WeightedCostFunction costFunction;
+
+  public CostBasedAutoScaler(
+      SeekableStreamSupervisor supervisor,
+      String dataSource,
+      CostBasedAutoScalerConfig config,
+      SupervisorSpec spec,
+      ServiceEmitter emitter
+  )
+  {
+    this.dataSource = dataSource;
+    this.config = config;
+    this.spec = spec;
+    this.supervisor = supervisor;
+    this.emitter = emitter;
+
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+
+    this.currentMetrics = new AtomicReference<>(null);
+    this.costFunction = new WeightedCostFunction();
+
+    this.metricsCollectionExec = Execs.scheduledSingleThreaded(
+        StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d"
+    );
+    this.scalingDecisionExec = Execs.scheduledSingleThreaded(

Review Comment:
   Do we really need two execs? We could have just 1 thread which runs at a 
certain period and first collects metrics, then performs scaling computations 
using those metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to