This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 14fc44c8860 Add taskCountStart property to lag autoscaler (#17900)
14fc44c8860 is described below

commit 14fc44c88607dbaca3c46e052caa85556326e7d6
Author: jtuglu-netflix <[email protected]>
AuthorDate: Tue Apr 22 03:29:41 2025 -0700

    Add taskCountStart property to lag autoscaler (#17900)
    
    * Add taskCountStart property to lag autoscaler
    
    * Adjust for PR comments
    
    * add docs
---
 docs/ingestion/supervisor.md                       |  3 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    | 45 +++++++++++++++++++++-
 .../SeekableStreamSupervisorIOConfig.java          |  3 +-
 .../supervisor/autoscaler/AutoScalerConfig.java    |  1 +
 .../autoscaler/LagBasedAutoScalerConfig.java       | 16 ++++++++
 5 files changed, 64 insertions(+), 4 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 0abdf7f2c3c..04e31adc916 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -74,6 +74,7 @@ The following table outlines the configuration properties for 
`autoScalerConfig`
 |`enableTaskAutoScaler`|Enables the autoscaler. If not specified, Druid 
disables the autoscaler even when `autoScalerConfig` is not null.|No|`false`|
 |`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or 
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka 
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to 
the number of Kafka partitions or Kinesis shards and ignores 
`taskCountMax`.|Yes||
 |`taskCountMin`|The minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
+|`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. When you enable the autoscaler, Druid ignores the value of 
`taskCount` in `ioConfig` and, if specified, starts with the `taskCountStart` 
number of tasks. Otherwise, defaults to `taskCountMin`.|No|`taskCountMin`|
 |`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two 
scale actions.| No|600000|
 |`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 
@@ -416,4 +417,4 @@ See the following topics for more information:
 
 * [Supervisor API](../api-reference/supervisor-api.md) for how to manage and 
monitor supervisors using the API.
 * [Apache Kafka ingestion](../ingestion/kafka-ingestion.md) to learn about 
ingesting data from an Apache Kafka stream.
-* [Amazon Kinesis ingestion](../ingestion/kinesis-ingestion.md) to learn about 
ingesting data from an Amazon Kinesis stream.
\ No newline at end of file
+* [Amazon Kinesis ingestion](../ingestion/kinesis-ingestion.md) to learn about 
ingesting data from an Amazon Kinesis stream.
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index cb2956afd3b..211230bf9d0 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -309,7 +309,7 @@ public class KafkaSupervisorIOConfigTest
     autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
     autoScalerConfig.put("scaleActionStartDelayMillis", 0);
     autoScalerConfig.put("scaleActionPeriodMillis", 100);
-    autoScalerConfig.put("taskCountMax", 2);
+    autoScalerConfig.put("taskCountMax", 10);
     autoScalerConfig.put("taskCountMin", 1);
     autoScalerConfig.put("scaleInStep", 1);
     autoScalerConfig.put("scaleOutStep", 2);
@@ -345,11 +345,52 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertNotNull(kafkaSupervisorIOConfig1.getAutoScalerConfig());
     
Assert.assertTrue(kafkaSupervisorIOConfig1.getAutoScalerConfig().getEnableTaskAutoScaler());
     Assert.assertEquals(1, 
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMin());
-    Assert.assertEquals(2, 
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
+    Assert.assertEquals(10, 
kafkaSupervisorIOConfig1.getAutoScalerConfig().getTaskCountMax());
     Assert.assertEquals(
         1200000,
         
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
     );
+
+    autoScalerConfig.put("taskCountStart", 5);
+    kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+        "test",
+        null,
+        null,
+        1,
+        1,
+        new Period("PT1H"),
+        consumerProperties,
+        mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        false
+    );
+    Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
+
+    Assert.assertThrows(
+        "taskCountMin <= taskCountStart <= taskCountMax",
+        RuntimeException.class, () -> {
+          autoScalerConfig.put("taskCountStart", 11); // > max task count
+          mapper.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class);
+        }
+    );
+
+    Assert.assertThrows(
+        "taskCountMin <= taskCountStart <= taskCountMax",
+        RuntimeException.class, () -> {
+          autoScalerConfig.put("taskCountStart", 0); // < min task count
+          mapper.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class);
+        }
+    );
   }
 
   @Test
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 90ba05e56cc..4b3e9650cb6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -76,7 +76,8 @@ public abstract class SeekableStreamSupervisorIOConfig
     this.autoScalerConfig = autoScalerConfig;
     // if autoscaler is enable then taskcount will be ignored here. and init 
taskcount will be equal to taskCountMin
     if (autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler()) {
-      this.taskCount = autoScalerConfig.getTaskCountMin();
+      final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
+      this.taskCount = startTaskCount != null ? startTaskCount : 
autoScalerConfig.getTaskCountMin();
     } else {
       this.taskCount = taskCount != null ? taskCount : 1;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 750e77328f6..afd3d0dcfc1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -39,6 +39,7 @@ public interface AutoScalerConfig
   long getMinTriggerScaleActionFrequencyMillis();
   int getTaskCountMax();
   int getTaskCountMin();
+  Integer getTaskCountStart();
   SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter);
 }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index 068e7cc4f87..60c7ff0ea9e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
@@ -42,6 +43,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
   private final double triggerScaleInFractionThreshold;
   private int taskCountMax;
   private int taskCountMin;
+  @Nullable private Integer taskCountStart;
   private final int scaleInStep;
   private final int scaleOutStep;
   private final boolean enableTaskAutoScaler;
@@ -59,6 +61,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
           @Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double 
triggerScaleOutFractionThreshold,
           @Nullable @JsonProperty("triggerScaleInFractionThreshold") Double 
triggerScaleInFractionThreshold,
           @JsonProperty("taskCountMax") Integer taskCountMax,
+          @Nullable @JsonProperty("taskCountStart") Integer taskCountStart,
           @JsonProperty("taskCountMin") Integer taskCountMin,
           @Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
           @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@@ -85,9 +88,12 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
         throw new RuntimeException("taskCountMax or taskCountMin can't be 
null!");
       } else if (taskCountMax < taskCountMin) {
         throw new RuntimeException("taskCountMax can't lower than 
taskCountMin!");
+      } else if (taskCountStart != null && (taskCountStart > taskCountMax || 
taskCountStart < taskCountMin)) {
+        throw new RuntimeException("taskCountMin <= taskCountStart <= 
taskCountMax");
       }
       this.taskCountMax = taskCountMax;
       this.taskCountMin = taskCountMin;
+      this.taskCountStart = taskCountStart;
     }
 
     this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
@@ -158,6 +164,15 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     return taskCountMin;
   }
 
+  @Override
+  @JsonProperty
+  @Nullable
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Integer getTaskCountStart()
+  {
+    return taskCountStart;
+  }
+
   @Override
   public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, 
SupervisorSpec spec, ServiceEmitter emitter)
   {
@@ -204,6 +219,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
             "enableTaskAutoScaler=" + enableTaskAutoScaler +
             ", taskCountMax=" + taskCountMax +
             ", taskCountMin=" + taskCountMin +
+            ", taskCountStart=" + taskCountStart +
             ", minTriggerScaleActionFrequencyMillis=" + 
minTriggerScaleActionFrequencyMillis +
             ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
             ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to