This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 68785badea1 Cost-based autoscaler: perform scaling action only when
metrics are available (#19028)
68785badea1 is described below
commit 68785badea1304ca82afa8327fc172c2f18289c8
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Thu Feb 19 09:31:38 2026 +0200
Cost-based autoscaler: perform scaling action only when metrics are
available (#19028)
Changes:
- Remove the config `defaultProcessingRate`
- Perform auto-scaling only when metrics become available
---
docs/ingestion/supervisor.md | 2 -
.../supervisor/autoscaler/CostBasedAutoScaler.java | 29 ++++----
.../autoscaler/CostBasedAutoScalerConfig.java | 22 ------
.../autoscaler/CostBasedAutoScalerConfigTest.java | 7 +-
.../autoscaler/CostBasedAutoScalerMockTest.java | 46 +++++++-----
.../autoscaler/CostBasedAutoScalerTest.java | 84 ++++++++++++++--------
.../autoscaler/WeightedCostFunctionTest.java | 11 ---
7 files changed, 98 insertions(+), 103 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 95991d4d5cf..c13764acd97 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -207,7 +207,6 @@ The following table outlines the configuration properties
related to the `costBa
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale
action is triggered. | No | 600000 |
|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
-|`defaultProcessingRate`|A planned processing rate per task, required for
first cost estimations. | No | 1000 |
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when
selecting task counts.|No| `false` |
|`highLagThreshold`|Per-partition lag threshold that triggers burst scale-up
when set to a value greater than `0`. Set to a negative value to disable burst
scale-up.|No|-1|
|`minScaleDownDelay`|Minimum duration between successful scale actions,
specified as an ISO-8601 duration string.|No|`PT30M`|
@@ -230,7 +229,6 @@ The following example shows a supervisor spec with
`lagBased` autoscaler:
"minTriggerScaleActionFrequencyMillis": 600000,
"lagWeight": 0.1,
"idleWeight": 0.9,
- "defaultProcessingRate": 100
}
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 8bf2e8ab787..6351c00f6d8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -36,6 +36,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -150,6 +151,11 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
public int computeTaskCountForScaleAction()
{
lastKnownMetrics = collectMetrics();
+ if (lastKnownMetrics == null) {
+ log.debug("Metrics not available for supervisorId [%s], skipping scaling
action", supervisorId);
+ return -1;
+ }
+
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
@@ -402,7 +408,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
/**
- * Extracts the average 15-minute moving average processing rate from task
stats.
+ * Extracts the average 15-minute, 5-minute, or 1-minute moving average
processing rate
+ * from task stats, depending on which is available, in this order.
* This rate represents the historical throughput (records per second) for
each task,
* averaged across all tasks.
*
@@ -448,7 +455,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return count > 0 ? sum / count : -1;
}
- private CostMetrics collectMetrics()
+ @Nullable
+ CostMetrics collectMetrics()
{
if (spec.isSuspended()) {
log.debug("Supervisor [%s] is suspended, skipping a metrics collection",
supervisorId);
@@ -466,27 +474,20 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
final double movingAvgRate = extractMovingAverage(taskStats);
+ // If moving average is not available, we stop scaling effort.
+ if (movingAvgRate < 0) {
+ return null;
+ }
final double pollIdleRatio = extractPollIdleRatio(taskStats);
-
final double avgPartitionLag = lagStats.getAvgLag();
- // Use an actual 15-minute moving average processing rate if available
- final double avgProcessingRate;
- if (movingAvgRate > 0) {
- avgProcessingRate = movingAvgRate;
- } else {
- // Fallback: estimate processing rate based on the idle ratio
- final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
- avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
- }
-
return new CostMetrics(
avgPartitionLag,
currentTaskCount,
partitionCount,
pollIdleRatio,
supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
- avgProcessingRate
+ movingAvgRate
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index cb791abefb3..025da787851 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -48,7 +48,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60
* 1000; // 5 minutes
static final double DEFAULT_LAG_WEIGHT = 0.25;
static final double DEFAULT_IDLE_WEIGHT = 0.75;
- static final double DEFAULT_PROCESSING_RATE = 1000.0; // 1000 records/sec
per task as default
static final Duration DEFAULT_MIN_SCALE_DELAY =
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
private final boolean enableTaskAutoScaler;
@@ -61,7 +60,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final double lagWeight;
private final double idleWeight;
- private final double defaultProcessingRate;
private final boolean useTaskCountBoundaries;
private final int highLagThreshold;
private final Duration minScaleDownDelay;
@@ -78,7 +76,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("scaleActionPeriodMillis") Long
scaleActionPeriodMillis,
@Nullable @JsonProperty("lagWeight") Double lagWeight,
@Nullable @JsonProperty("idleWeight") Double idleWeight,
- @Nullable @JsonProperty("defaultProcessingRate") Double
defaultProcessingRate,
@Nullable @JsonProperty("useTaskCountBoundaries") Boolean
useTaskCountBoundaries,
@Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
@Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@@ -99,7 +96,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
// Cost function weights with defaults
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
- this.defaultProcessingRate = Configs.valueOrDefault(defaultProcessingRate,
DEFAULT_PROCESSING_RATE);
this.useTaskCountBoundaries =
Configs.valueOrDefault(useTaskCountBoundaries, false);
this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay,
DEFAULT_MIN_SCALE_DELAY);
@@ -129,7 +125,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >=
0");
- Preconditions.checkArgument(this.defaultProcessingRate > 0,
"defaultProcessingRate must be > 0");
Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0,
"minScaleDownDelay must be >= 0");
}
@@ -203,12 +198,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return idleWeight;
}
- @JsonProperty
- public double getDefaultProcessingRate()
- {
- return defaultProcessingRate;
- }
-
/**
* Enables or disables the use of task count boundaries derived from the
current partitions-per-task (PPT) ratio.
*/
@@ -273,7 +262,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
&& scaleActionPeriodMillis == that.scaleActionPeriodMillis
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
- && Double.compare(that.defaultProcessingRate,
defaultProcessingRate) == 0
&& useTaskCountBoundaries == that.useTaskCountBoundaries
&& Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
&& scaleDownDuringTaskRolloverOnly ==
that.scaleDownDuringTaskRolloverOnly
@@ -295,7 +283,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- defaultProcessingRate,
useTaskCountBoundaries,
highLagThreshold,
minScaleDownDelay,
@@ -316,7 +303,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", lagWeight=" + lagWeight +
", idleWeight=" + idleWeight +
- ", defaultProcessingRate=" + defaultProcessingRate +
", useTaskCountBoundaries=" + useTaskCountBoundaries +
", highLagThreshold=" + highLagThreshold +
", minScaleDownDelay=" + minScaleDownDelay +
@@ -339,7 +325,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private Long scaleActionPeriodMillis;
private Double lagWeight;
private Double idleWeight;
- private Double defaultProcessingRate;
private Boolean useTaskCountBoundaries;
private Integer highLagThreshold;
private Duration minScaleDownDelay;
@@ -403,12 +388,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
- public Builder defaultProcessingRate(double defaultProcessingRate)
- {
- this.defaultProcessingRate = defaultProcessingRate;
- return this;
- }
-
public Builder minScaleDownDelay(Duration minScaleDownDelay)
{
this.minScaleDownDelay = minScaleDownDelay;
@@ -445,7 +424,6 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- defaultProcessingRate,
useTaskCountBoundaries,
highLagThreshold,
minScaleDownDelay,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 96e15672bd1..fa50d87b56d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -29,9 +29,9 @@ import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.Cos
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_PROCESSING_RATE;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+@SuppressWarnings("TextBlockMigration")
public class CostBasedAutoScalerConfigTest
{
private final ObjectMapper mapper = new DefaultObjectMapper();
@@ -50,7 +50,6 @@ public class CostBasedAutoScalerConfigTest
+ " \"scaleActionPeriodMillis\": 60000,\n"
+ " \"lagWeight\": 0.6,\n"
+ " \"idleWeight\": 0.4,\n"
- + " \"defaultProcessingRate\": 2000.0,\n"
+ " \"highLagThreshold\": 30000,\n"
+ " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true\n"
@@ -67,7 +66,6 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
- Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertEquals(30000, config.getHighLagThreshold());
@@ -103,7 +101,6 @@ public class CostBasedAutoScalerConfigTest
);
Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
- Assert.assertEquals(DEFAULT_PROCESSING_RATE,
config.getDefaultProcessingRate(), 0.001);
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
config.getMinScaleDownDelay());
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
Assert.assertNull(config.getTaskCountStart());
@@ -191,7 +188,6 @@ public class CostBasedAutoScalerConfigTest
.scaleActionPeriodMillis(60000L)
.lagWeight(0.6)
.idleWeight(0.4)
-
.defaultProcessingRate(2000.0)
.minScaleDownDelay(Duration.standardMinutes(10))
.scaleDownDuringTaskRolloverOnly(true)
.highLagThreshold(30000)
@@ -206,7 +202,6 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
- Assert.assertEquals(2000.0, config.getDefaultProcessingRate(), 0.001);
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertEquals(30000, config.getHighLagThreshold());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index d0e9f90f584..218b429183b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -30,7 +29,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
@@ -96,7 +94,7 @@ public class CostBasedAutoScalerMockTest
int scaleUpOptimal = 17;
// Trigger scale-up, which should set the cooldown timer
doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 5000.0, 0.1);
Assert.assertEquals(
"Should return optimal count when it's greater than current
(scale-up)",
@@ -106,7 +104,7 @@ public class CostBasedAutoScalerMockTest
// Verify cooldown blocks immediate subsequent scaling
doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
Assert.assertEquals(
"Scale action should be blocked during the cooldown window",
-1,
@@ -123,7 +121,7 @@ public class CostBasedAutoScalerMockTest
int optimalCount = 25; // Same as current
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -152,7 +150,7 @@ public class CostBasedAutoScalerMockTest
int optimalCount = 30; // Lower than current (scale-down scenario)
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
// First attempt: allowed (no prior scale action)
Assert.assertEquals(
@@ -180,7 +178,7 @@ public class CostBasedAutoScalerMockTest
// Mock computeOptimalTaskCount to return -1 (simulating null metrics
scenario)
doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -201,7 +199,7 @@ public class CostBasedAutoScalerMockTest
// Mock computeOptimalTaskCount to return -1 (simulating null lag stats
scenario)
doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -221,7 +219,7 @@ public class CostBasedAutoScalerMockTest
int expectedOptimalCount = 5;
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10000.0, 0.0);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10000.0, 0.0);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -241,7 +239,7 @@ public class CostBasedAutoScalerMockTest
int expectedOptimalCount = 100; // Maximum allowed
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 50000.0, 0.0);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 50000.0, 0.0);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -261,7 +259,7 @@ public class CostBasedAutoScalerMockTest
int expectedOptimalCount = 26; // Just one more than current
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 1000.0, 0.2);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 1000.0, 0.2);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -281,7 +279,7 @@ public class CostBasedAutoScalerMockTest
int optimalCount = 24; // Just one less than current
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.8);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.8);
int result = autoScaler.computeTaskCountForScaleAction();
@@ -314,7 +312,7 @@ public class CostBasedAutoScalerMockTest
int optimalCount = 30; // Lower than current (scale-down scenario)
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
Assert.assertEquals(
"Should return -1 when scaleDownDuringTaskRolloverOnly is true",
@@ -346,7 +344,7 @@ public class CostBasedAutoScalerMockTest
// Set up lastKnownMetrics by calling computeTaskCountForScaleAction first
without scaling
doReturn(currentTaskCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
autoScaler.computeTaskCountForScaleAction(); // This populates
lastKnownMetrics
doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
@@ -357,12 +355,22 @@ public class CostBasedAutoScalerMockTest
);
}
- private void setupMocksForMetricsCollection(int taskCount, double avgLag,
double pollIdleRatio)
+ private void setupMocksForMetricsCollection(
+ CostBasedAutoScaler autoScaler,
+ int taskCount,
+ double avgLag,
+ double pollIdleRatio
+ )
{
- when(mockSupervisor.computeLagStats()).thenReturn(new LagStats(0, (long)
avgLag * 2, (long) avgLag));
- when(mockIoConfig.getTaskCount()).thenReturn(taskCount);
- when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
- when(mockSupervisor.getStats()).thenReturn(Collections.emptyMap());
+ CostMetrics metrics = new CostMetrics(
+ avgLag,
+ taskCount,
+ PARTITION_COUNT,
+ pollIdleRatio,
+ TASK_DURATION_SECONDS,
+ AVG_PROCESSING_RATE
+ );
+ doReturn(metrics).when(autoScaler).collectMetrics();
}
private CostMetrics createMetrics(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index bb6eca691a2..1fcb78e450e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
@@ -41,7 +42,7 @@ import static
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeter
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts;
import static org.mockito.Mockito.when;
-@SuppressWarnings({"SameParameterValue", "InstantiationOfUtilityClass"})
+@SuppressWarnings({"SameParameterValue"})
public class CostBasedAutoScalerTest
{
private CostBasedAutoScaler autoScaler;
@@ -69,7 +70,6 @@ public class CostBasedAutoScalerTest
autoScaler = new CostBasedAutoScaler(mockSupervisor, config,
mockSupervisorSpec, mockEmitter);
}
- @SuppressWarnings("InstantiationOfUtilityClass")
@Test
public void testComputeValidTaskCounts()
{
@@ -139,7 +139,15 @@ public class CostBasedAutoScalerTest
Assert.assertFalse("High lag should not jump straight to max (30) from 3",
contains(highLagCounts, 30));
// Respects taskCountMax
- int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 1, 3,
useTaskCountBoundaries, highLagThreshold);
+ int[] cappedCounts = computeValidTaskCounts(
+ 30,
+ 4,
+ highAggregateLag,
+ 1,
+ 3,
+ useTaskCountBoundaries,
+ highLagThreshold
+ );
Assert.assertTrue("Should include taskCountMax when within bounds",
contains(cappedCounts, 3));
Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
@@ -186,9 +194,14 @@ public class CostBasedAutoScalerTest
public void testExtractPollIdleRatio()
{
// Null and empty return 0
- Assert.assertEquals("Null stats should yield 0 idle ratio", 0.,
CostBasedAutoScaler.extractPollIdleRatio(null), 0.0001);
Assert.assertEquals(
- "Empty stats should yield 0 idle ratio",
+ "Null stats should yield 0 idle ratios",
+ 0.,
+ CostBasedAutoScaler.extractPollIdleRatio(null),
+ 0.0001
+ );
+ Assert.assertEquals(
+ "Empty stats should yield 0 idle ratios",
0.,
CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
0.0001
@@ -198,7 +211,7 @@ public class CostBasedAutoScalerTest
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new
HashMap<>()));
Assert.assertEquals(
- "Missing autoscaler metrics should yield 0 idle ratio",
+ "Missing autoscaler metrics should yield 0 idle ratios",
0.,
CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
0.0001
@@ -233,7 +246,7 @@ public class CostBasedAutoScalerTest
taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new
HashMap<>());
emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
Assert.assertEquals(
- "Empty autoscaler metrics should yield 0 idle ratio",
+ "Empty autoscaler metrics should yield 0 idle ratios",
0.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
0.0001
@@ -270,7 +283,12 @@ public class CostBasedAutoScalerTest
public void testExtractMovingAverage()
{
// Null and empty return -1
- Assert.assertEquals("Null stats should yield -1 moving average", -1.,
CostBasedAutoScaler.extractMovingAverage(null), 0.0001);
+ Assert.assertEquals(
+ "Null stats should yield -1 moving average",
+ -1.,
+ CostBasedAutoScaler.extractMovingAverage(null),
+ 0.0001
+ );
Assert.assertEquals(
"Empty stats should yield -1 moving average",
-1.,
@@ -418,7 +436,6 @@ public class CostBasedAutoScalerTest
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("stream");
- // Test config defaults for minScaleDownDelay, defaultProcessingRate,
scaleDownDuringTaskRolloverOnly
CostBasedAutoScalerConfig cfgWithDefaults =
CostBasedAutoScalerConfig.builder()
.taskCountMax(10)
.taskCountMin(1)
@@ -428,7 +445,6 @@ public class CostBasedAutoScalerTest
CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY,
cfgWithDefaults.getMinScaleDownDelay()
);
- Assert.assertEquals(1000.0, cfgWithDefaults.getDefaultProcessingRate(),
0.001);
Assert.assertFalse(cfgWithDefaults.isScaleDownOnTaskRolloverOnly());
// Test custom config values
@@ -437,11 +453,9 @@ public class CostBasedAutoScalerTest
.taskCountMin(1)
.enableTaskAutoScaler(true)
.minScaleDownDelay(Duration.standardMinutes(10))
-
.defaultProcessingRate(5000.0)
.scaleDownDuringTaskRolloverOnly(true)
.build();
Assert.assertEquals(Duration.standardMinutes(10),
cfgWithCustom.getMinScaleDownDelay());
- Assert.assertEquals(5000.0, cfgWithCustom.getDefaultProcessingRate(),
0.001);
Assert.assertTrue(cfgWithCustom.isScaleDownOnTaskRolloverOnly());
// computeTaskCountForRollover returns -1 when
scaleDownDuringTaskRolloverOnly=false (default)
@@ -454,29 +468,41 @@ public class CostBasedAutoScalerTest
Assert.assertEquals(-1,
scalerWithRolloverOnly.computeTaskCountForRollover());
}
- private CostMetrics createMetrics(
- double avgPartitionLag,
- int currentTaskCount,
- int partitionCount,
- double pollIdleRatio
- )
+ @Test
+ public void testScalingActionSkippedWhenMovingAverageRateUnavailable()
{
- return new CostMetrics(
- avgPartitionLag,
- currentTaskCount,
- partitionCount,
- pollIdleRatio,
- 3600,
- 1000.0
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("test-supervisor");
+ when(spec.isSuspended()).thenReturn(false);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("test-stream");
+ when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
+ // No task stats means the moving average rate is unavailable
+ when(supervisor.getStats()).thenReturn(Collections.emptyMap());
+
+ CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+ .build();
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, config,
spec, emitter);
+
+ Assert.assertEquals(
+ "No scaling action should be requested when the moving average rate is
unavailable",
+ -1,
+ scaler.computeTaskCountForScaleAction()
);
}
- private CostMetrics createMetricsWithRate(
+ private CostMetrics createMetrics(
double avgPartitionLag,
int currentTaskCount,
int partitionCount,
- double pollIdleRatio,
- double avgProcessingRate
+ double pollIdleRatio
)
{
return new CostMetrics(
@@ -485,7 +511,7 @@ public class CostBasedAutoScalerTest
partitionCount,
pollIdleRatio,
3600,
- avgProcessingRate
+ 1000.0
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 48285c6efc4..c4d4f493498 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -38,7 +38,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(0.3)
.idleWeight(0.7)
- .defaultProcessingRate(1000.0)
.build();
}
@@ -67,7 +66,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(100.0)
.build();
CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
@@ -94,7 +92,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(1000.0)
.build();
// aggregateLag = 100000 * 100 = 10,000,000; lagPerPartition = 100,000
@@ -138,7 +135,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(1000.0)
.build();
CostBasedAutoScalerConfig idleOnly = CostBasedAutoScalerConfig.builder()
@@ -147,7 +143,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(0.0)
.idleWeight(1.0)
-
.defaultProcessingRate(1000.0)
.build();
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.1);
@@ -196,7 +191,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(1000.0)
.build();
double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount
+ 5, lagOnlyConfig).totalCost();
@@ -221,7 +215,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(0.0)
.idleWeight(1.0)
-
.defaultProcessingRate(1000.0)
.build();
// Current: 10 tasks with 40% idle (60% busy)
@@ -247,7 +240,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(0.0)
.idleWeight(1.0)
-
.defaultProcessingRate(1000.0)
.build();
// Extreme scale-down: 10 tasks → 2 tasks with 40% idle
@@ -277,7 +269,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(0.0)
.idleWeight(1.0)
-
.defaultProcessingRate(1000.0)
.build();
// Negative idle ratio indicates missing data → should default to 0.5
@@ -302,7 +293,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(1000.0)
.build();
int currentTaskCount = 10;
@@ -333,7 +323,6 @@ public class WeightedCostFunctionTest
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
-
.defaultProcessingRate(1000.0)
.build();
int currentTaskCount = 10;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]