This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 14fc44c8860 Add taskCountStart property to lag autoscaler (#17900)
14fc44c8860 is described below
commit 14fc44c88607dbaca3c46e052caa85556326e7d6
Author: jtuglu-netflix <[email protected]>
AuthorDate: Tue Apr 22 03:29:41 2025 -0700
Add taskCountStart property to lag autoscaler (#17900)
* Add taskCountStart property to lag autoscaler
* Adjust for PR comments
* add docs
---
docs/ingestion/supervisor.md | 3 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 45 +++++++++++++++++++++-
.../SeekableStreamSupervisorIOConfig.java | 3 +-
.../supervisor/autoscaler/AutoScalerConfig.java | 1 +
.../autoscaler/LagBasedAutoScalerConfig.java | 16 ++++++++
5 files changed, 64 insertions(+), 4 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 0abdf7f2c3c..04e31adc916 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -74,6 +74,7 @@ The following table outlines the configuration properties for
`autoScalerConfig`
|`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to
the number of Kafka partitions or Kinesis shards and ignores
`taskCountMax`.|Yes||
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts
with the `taskCountMin` number of tasks to launch.|Yes||
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. When you enable the autoscaler, Druid ignores the value of
`taskCount` in `ioConfig` and, if specified, starts with the `taskCountStart`
number of tasks. Otherwise, defaults to `taskCountMin`.|No|`taskCountMin`|
|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two
scale actions.| No|600000|
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more
information.|No|`lagBased`|
@@ -416,4 +417,4 @@ See the following topics for more information:
* [Supervisor API](../api-reference/supervisor-api.md) for how to manage and
monitor supervisors using the API.
* [Apache Kafka ingestion](../ingestion/kafka-ingestion.md) to learn about
ingesting data from an Apache Kafka stream.
-* [Amazon Kinesis ingestion](../ingestion/kinesis-ingestion.md) to learn about
ingesting data from an Amazon Kinesis stream.
\ No newline at end of file
+* [Amazon Kinesis ingestion](../ingestion/kinesis-ingestion.md) to learn about
ingesting data from an Amazon Kinesis stream.
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index cb2956afd3b..211230bf9d0 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -309,7 +309,7 @@ public class KafkaSupervisorIOConfigTest
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
- autoScalerConfig.put("taskCountMax", 2);
+ autoScalerConfig.put("taskCountMax", 10);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
@@ -345,11 +345,52 @@ public class KafkaSupervisorIOConfigTest
Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig());
Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler());
Assert.assertEquals(1,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin());
- Assert.assertEquals(2,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
+ Assert.assertEquals(10,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
Assert.assertEquals(
1200000,
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
);
+
+ autoScalerConfig.put("taskCountStart", 5);
+ kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+ "test",
+ null,
+ null,
+ 1,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ );
+ Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
+
+ Assert.assertThrows(
+ "taskCountMin <= taskCountStart <= taskCountMax",
+ RuntimeException.class, () -> {
+ autoScalerConfig.put("taskCountStart", 11); // > max task count
+ mapper.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class);
+ }
+ );
+
+ Assert.assertThrows(
+ "taskCountMin <= taskCountStart <= taskCountMax",
+ RuntimeException.class, () -> {
+ autoScalerConfig.put("taskCountStart", 0); // < min task count
+ mapper.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class);
+ }
+ );
}
@Test
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 90ba05e56cc..4b3e9650cb6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -76,7 +76,8 @@ public abstract class SeekableStreamSupervisorIOConfig
this.autoScalerConfig = autoScalerConfig;
// if autoscaler is enable then taskcount will be ignored here. and init
taskcount will be equal to taskCountMin
if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler()) {
- this.taskCount = autoScalerConfig.getTaskCountMin();
+ final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
+ this.taskCount = startTaskCount != null ? startTaskCount :
autoScalerConfig.getTaskCountMin();
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 750e77328f6..afd3d0dcfc1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -39,6 +39,7 @@ public interface AutoScalerConfig
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
+ Integer getTaskCountStart();
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index 068e7cc4f87..60c7ff0ea9e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
@@ -42,6 +43,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
private final double triggerScaleInFractionThreshold;
private int taskCountMax;
private int taskCountMin;
+ @Nullable private Integer taskCountStart;
private final int scaleInStep;
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
@@ -59,6 +61,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double
triggerScaleOutFractionThreshold,
@Nullable @JsonProperty("triggerScaleInFractionThreshold") Double
triggerScaleInFractionThreshold,
@JsonProperty("taskCountMax") Integer taskCountMax,
+ @Nullable @JsonProperty("taskCountStart") Integer taskCountStart,
@JsonProperty("taskCountMin") Integer taskCountMin,
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@@ -85,9 +88,12 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
throw new RuntimeException("taskCountMax or taskCountMin can't be
null!");
} else if (taskCountMax < taskCountMin) {
throw new RuntimeException("taskCountMax can't lower than
taskCountMin!");
+ } else if (taskCountStart != null && (taskCountStart > taskCountMax ||
taskCountStart < taskCountMin)) {
+ throw new RuntimeException("taskCountMin <= taskCountStart <=
taskCountMax");
}
this.taskCountMax = taskCountMax;
this.taskCountMin = taskCountMin;
+ this.taskCountStart = taskCountStart;
}
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
@@ -158,6 +164,15 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
return taskCountMin;
}
+ @Override
+ @JsonProperty
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getTaskCountStart()
+ {
+ return taskCountStart;
+ }
+
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor,
SupervisorSpec spec, ServiceEmitter emitter)
{
@@ -204,6 +219,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
"enableTaskAutoScaler=" + enableTaskAutoScaler +
", taskCountMax=" + taskCountMax +
", taskCountMin=" + taskCountMin +
+ ", taskCountStart=" + taskCountStart +
", minTriggerScaleActionFrequencyMillis=" +
minTriggerScaleActionFrequencyMillis +
", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]