This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c57bd3b4388 supervisor/autoscaler: Skip scaling when partitions are 
less than minTaskCount (#17335)
c57bd3b4388 is described below

commit c57bd3b4388cad4ec0087d0951b07648ea378cab
Author: Adithya Chakilam <[email protected]>
AuthorDate: Tue Oct 15 16:12:53 2024 -0500

    supervisor/autoscaler: Skip scaling when partitions are less than 
minTaskCount (#17335)
---
 .../supervisor/autoscaler/LagBasedAutoScaler.java  | 16 +++----
 .../SeekableStreamSupervisorSpecTest.java          | 52 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 8 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index ec81c5f9f99..22e36841199 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -216,17 +216,16 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
     int desiredActiveTaskCount;
+    int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
 
     if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
       // Do Scale out
       int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
 
-      int partitionCount = supervisor.getPartitionCount();
-      if (partitionCount <= 0) {
-        log.warn("Partition number for [%s] <= 0 ? how can it be?", 
dataSource);
-        return -1;
-      }
-
       int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
       if (currentActiveTaskCount == actualTaskCountMax) {
         log.warn("CurrentActiveTaskCount reached task count Max limit, 
skipping scale out action for dataSource [%s].",
@@ -248,7 +247,8 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
     if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
       // Do Scale in
       int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      if (currentActiveTaskCount == 
lagBasedAutoScalerConfig.getTaskCountMin()) {
+      int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+      if (currentActiveTaskCount == actualTaskCountMin) {
         log.warn("CurrentActiveTaskCount reached task count Min limit, 
skipping scale in action for dataSource [%s].",
             dataSource
         );
@@ -260,7 +260,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
                          
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
         return -1;
       } else {
-        desiredActiveTaskCount = Math.max(taskCount, 
lagBasedAutoScalerConfig.getTaskCountMin());
+        desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
       }
       return desiredActiveTaskCount;
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index baff5fc765b..3281360f580 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -992,6 +992,58 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
+  @Test
+  public void 
testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() 
throws InterruptedException
+  {
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, 
false)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
+    Map<String, Object> modifiedScaleInProps = getScaleInProperties();
+
+    modifiedScaleInProps.put("taskCountMax", 20);
+    modifiedScaleInProps.put("taskCountMin", 15);
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            modifiedScaleInProps,
+            LagBasedAutoScalerConfig.class
+        ),
+        spec,
+        emitter
+    );
+
+    // enable autoscaler so that taskcount config will be ignored and init 
value of taskCount will use taskCountMin.
+    Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+    supervisor.getIoConfig().setTaskCount(2);
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+
+    Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
+    Thread.sleep(2000);
+    Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
   @Test
   public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws 
InterruptedException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to